Documented concurrency control between filewriter thread and log clients.

Changed log header text, replaced SkySQL GAteway with SkySQL MaxScale.
Fixed bug in log flushing. skygw_log_write_flush didn't cause call of fsync, which suspended file writing.
This commit is contained in:
vraatikka
2013-07-25 16:21:13 +03:00
parent 003db6eaa9
commit 52564314d4
3 changed files with 69 additions and 34 deletions

View File

@ -477,11 +477,8 @@ static int logmanager_write_log(
logfile_t* lf; logfile_t* lf;
char* wp; char* wp;
int err = 0; int err = 0;
#if 0
int* refcount;
#else
blockbuf_t* bb; blockbuf_t* bb;
#endif
CHK_LOGMANAGER(lm); CHK_LOGMANAGER(lm);
if (id < LOGFILE_FIRST || id > LOGFILE_LAST) { if (id < LOGFILE_FIRST || id > LOGFILE_LAST) {
@ -508,13 +505,21 @@ static int logmanager_write_log(
lf = &lm->lm_logfile[id]; lf = &lm->lm_logfile[id];
CHK_LOGFILE(lf); CHK_LOGFILE(lf);
/** /**
* When string pointer is NULL, case is skygw_log_flush. * When string pointer is NULL, case is skygw_log_flush and no
* writing is involved. With flush && str != NULL case is
* skygw_log_write_flush.
*/ */
if (str == NULL) { if (flush) {
ss_dassert(flush); ss_dassert(flush);
ss_dassert(!(str == NULL && !flush));
logfile_flush(lf); logfile_flush(lf);
if (str == NULL) {
goto return_err; goto return_err;
} }
} else {
ss_dassert(str != NULL);
}
/** Check string length. */ /** Check string length. */
if (str_len > MAX_LOGSTRLEN) { if (str_len > MAX_LOGSTRLEN) {
@ -669,7 +674,8 @@ static char* blockbuf_get_writepos(
CHK_BLOCKBUF(bb); CHK_BLOCKBUF(bb);
/** /**
* Increase version to odd to mark list update active update. * Increase version to odd to mark list update active
* update.
*/ */
bb_list->mlist_versno += 1; bb_list->mlist_versno += 1;
ss_dassert(bb_list->mlist_versno%2 == 1); ss_dassert(bb_list->mlist_versno%2 == 1);
@ -769,19 +775,14 @@ static char* blockbuf_get_writepos(
bb->bb_buf_left -= count_len; bb->bb_buf_left -= count_len;
*/ */
/** /**
* If flush flag is set, set buffer full. As a consequence, * If flush flag is set, set buffer full. As a consequence, no-one
* it will be flushed. * can write to it before it is flushed to disk.
*/ */
bb->bb_isfull = (flush == TRUE ? TRUE : bb->bb_isfull); bb->bb_isfull = (flush == TRUE ? TRUE : bb->bb_isfull);
/** Unlock buffer */ /** Unlock buffer */
simple_mutex_unlock(&bb->bb_mutex); simple_mutex_unlock(&bb->bb_mutex);
#if 0
/** Release lock */
simple_mutex_unlock(&bb_list->mlist_mutex);
#endif
ss_dassert(bb_list->mlist_mutex->sm_lock_thr != pthread_self()); ss_dassert(bb_list->mlist_mutex->sm_lock_thr != pthread_self());
return pos; return pos;
} }
@ -1418,6 +1419,44 @@ static void filewriter_done(
} }
/**
* @node Writes block buffers of logfiles to physical log files on disk.
*
* Parameters:
* @param data - thread context, skygw_thread_t
* <description>
*
* @return
*
*
* @details Waits until receives wake-up message. Scans through block buffer
* lists of each logfile object.
*
* Block buffer is written to log file if
* 1. bb_isfull == TRUE,
* 2. logfile object's lf_flushflag == TRUE, or
* 3. skygw_thread_must_exit returns TRUE.
*
* Log file is flushed (fsync'd) in cases #2 and #3.
*
* Concurrency control : block buffer is accessed by file writer (this) and
* log clients. File writer reads and sets each logfile object's flushflag
* with spinlock. Another protected section is when block buffer's metadata is
* read, and optionally the write operation.
*
* When block buffer is marked full or flush flag is set, they don't try to
* access buffer. There may, however, be active writes, on the block in parallel
* with file writer operations. Each active write corresponds to bb_refcounter
* values. File writer doesn't write block buffer before its refcount is zero.
*
* Block buffers are located in a linked list. List is accessed by log clients,
* which add nodes if necessary, and by file writer which traverses the list
* and accesses block buffers included in list nodes. List modifications are
* protected with version numbers. Before modification, version is increased
* by one to be odd. After the completion, it is increased again to even. List
* can be read only when version is even and read is consistent only if version
* hasn't changed during the read.
*/
static void* thr_filewriter_fun( static void* thr_filewriter_fun(
void* data) void* data)
{ {
@ -1472,12 +1511,6 @@ static void* thr_filewriter_fun(
* get logfile's block buffer list * get logfile's block buffer list
*/ */
bb_list = &lf->lf_blockbuf_list; bb_list = &lf->lf_blockbuf_list;
/** Korvaa mutex version luvulla, luvulla ja uudella version luvulla.
* Joka kohdassa.
*/
#if 0
simple_mutex_lock(&bb_list->mlist_mutex, TRUE);
#endif
CHK_MLIST(bb_list); CHK_MLIST(bb_list);
node = bb_list->mlist_first; node = bb_list->mlist_first;
@ -1487,6 +1520,7 @@ static void* thr_filewriter_fun(
bb = (blockbuf_t *)node->mlnode_data; bb = (blockbuf_t *)node->mlnode_data;
CHK_BLOCKBUF(bb); CHK_BLOCKBUF(bb);
/** Lock block buffer */
simple_mutex_lock(&bb->bb_mutex, TRUE); simple_mutex_lock(&bb->bb_mutex, TRUE);
if (bb->bb_buf_used != 0 && if (bb->bb_buf_used != 0 &&
@ -1502,10 +1536,11 @@ static void* thr_filewriter_fun(
skygw_file_write(file, skygw_file_write(file,
(void *)bb->bb_buf, (void *)bb->bb_buf,
bb->bb_buf_used); bb->bb_buf_used,
(flush || flushall)); /**< call fsync */
/** /**
* Reset buffer * Reset buffer
* TODO: it may be probably faster to free and calloc * NOTE: it may be probably faster to free and calloc
* new buffer every time full one is locked for * new buffer every time full one is locked for
* file write. * file write.
*/ */
@ -1514,7 +1549,7 @@ static void* thr_filewriter_fun(
memset(bb->bb_buf, 0, bb->bb_buf_size); memset(bb->bb_buf, 0, bb->bb_buf_size);
bb->bb_isfull = FALSE; bb->bb_isfull = FALSE;
} }
/** Release lock to block buffer */
simple_mutex_unlock(&bb->bb_mutex); simple_mutex_unlock(&bb->bb_mutex);
/** Consistent lock-free read on the list */ /** Consistent lock-free read on the list */
@ -1525,9 +1560,6 @@ static void* thr_filewriter_fun(
} while (vn1 != vn2); } while (vn1 != vn2);
} /* while (node != NULL) */ } /* while (node != NULL) */
#if 0
simple_mutex_unlock(&bb_list->mlist_mutex);
#endif
} /* for */ } /* for */
} /* while (!skygw_thread_must_exit) */ } /* while (!skygw_thread_must_exit) */

View File

@ -1520,8 +1520,7 @@ static bool file_write_header(
*tm = *localtime(t); *tm = *localtime(t);
CHK_FILE(file); CHK_FILE(file);
header_buf1 = "\n----------\nSkySQL Gateway "; header_buf1 = "\n----------\nSkySQL MaxScale ";
header_buf2 = strdup(asctime(tm)); header_buf2 = strdup(asctime(tm));
if (header_buf2 == NULL) { if (header_buf2 == NULL) {
@ -1558,7 +1557,8 @@ return_succp:
bool skygw_file_write( bool skygw_file_write(
skygw_file_t* file, skygw_file_t* file,
void* data, void* data,
size_t nbytes) size_t nbytes,
bool flush)
{ {
bool succp = FALSE; bool succp = FALSE;
#if !defined(LAPTOP_TEST) #if !defined(LAPTOP_TEST)
@ -1582,10 +1582,9 @@ bool skygw_file_write(
file->sf_fname); file->sf_fname);
goto return_succp; goto return_succp;
} }
writecount += 1; writecount += 1;
if (writecount == FSYNCLIMIT) { if (flush || writecount == FSYNCLIMIT) {
fd = fileno(file->sf_file); fd = fileno(file->sf_file);
fsync(fd); fsync(fd);
writecount = 0; writecount = 0;

View File

@ -144,7 +144,11 @@ EXTERN_C_BLOCK_END
/** Skygw file routines */ /** Skygw file routines */
skygw_file_t* skygw_file_init(char* fname); skygw_file_t* skygw_file_init(char* fname);
void skygw_file_done(skygw_file_t* file); void skygw_file_done(skygw_file_t* file);
bool skygw_file_write(skygw_file_t* file, void* data, size_t nbytes); bool skygw_file_write(
skygw_file_t* file,
void* data,
size_t nbytes,
bool flush);
/** Skygw file routines */ /** Skygw file routines */
EXTERN_C_BLOCK_BEGIN EXTERN_C_BLOCK_BEGIN