Merge pull request #1 from akopytov/0.5

Merge upstream
This commit is contained in:
Vadim Tkachenko
2016-04-08 10:38:50 -07:00
8 changed files with 308 additions and 89 deletions

View File

@ -49,7 +49,7 @@ endif
sysbench_SOURCES = sysbench.c sysbench.h sb_timer.c sb_timer.h \
sb_options.c sb_options.h sb_logger.c sb_logger.h sb_list.h db_driver.h \
db_driver.c sb_percentile.c sb_percentile.h
db_driver.c sb_percentile.c sb_percentile.h sb_barrier.c sb_barrier.h
sysbench_LDADD = tests/fileio/libsbfileio.a tests/threads/libsbthreads.a \
tests/memory/libsbmemory.a tests/cpu/libsbcpu.a \

104
sysbench/sb_barrier.c Normal file
View File

@ -0,0 +1,104 @@
/*
Copyright (C) 2016 Alexey Kopytov <akopytov@gmail.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
/*
Thread barrier implementation. It differs from pthread_barrier_t in two ways:
- it's more portable (will also work on OS X and Windows with existing
pthread_* wrappers in sb_win.c).
- it allows defining a callback function which is called right before
signaling the participating threads to continue, i.e. as soon as the
required number of threads reach the barrier. The callback can also signal
an error to sb_barrier_wait() callers by returning a non-zero value. In
which case sb_barrier_wait() returns a negative value to all callers.
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include "sb_barrier.h"
int sb_barrier_init(sb_barrier_t *barrier, unsigned int count,
sb_barrier_cb_t callback, void *arg)
{
if (count == 0)
return 1;
if (pthread_mutex_init(&barrier->mutex, NULL) ||
pthread_cond_init(&barrier->cond, NULL))
return 1;
barrier->init_count = count;
barrier->count = count;
barrier->callback = callback;
barrier->arg = arg;
barrier->serial = 0;
barrier->error = 0;
return 0;
}
int sb_barrier_wait(sb_barrier_t *barrier)
{
int res;
pthread_mutex_lock(&barrier->mutex);
if (!--barrier->count)
{
barrier->serial++;
barrier->count = barrier->init_count;
res = SB_BARRIER_SERIAL_THREAD;
pthread_cond_broadcast(&barrier->cond);
if (barrier->callback != NULL && barrier->callback(barrier->arg) != 0)
{
barrier->error = 1;
res = -1;
}
pthread_mutex_unlock(&barrier->mutex);
}
else
{
unsigned int serial = barrier->serial;
do {
pthread_cond_wait(&barrier->cond, &barrier->mutex);
} while (serial == barrier->serial);
res = barrier->error ? -1 : 0;
pthread_mutex_unlock(&barrier->mutex);
}
return res;
}
void sb_barrier_destroy(sb_barrier_t *barrier)
{
pthread_mutex_destroy(&barrier->mutex);
pthread_cond_destroy(&barrier->cond);
}

58
sysbench/sb_barrier.h Normal file
View File

@ -0,0 +1,58 @@
/*
Copyright (C) 2016 Alexey Kopytov <akopytov@gmail.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
/* Thread barrier implementation. */
#ifndef SB_BARRIER_H
#define SB_BARRIER_H
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#ifdef HAVE_PTHREAD_H
# include <pthread.h>
#endif
#ifdef _WIN32
#include "sb_win.h"
#endif
#define SB_BARRIER_SERIAL_THREAD 1
typedef int (*sb_barrier_cb_t)(void *);
typedef struct {
unsigned int count;
unsigned int init_count;
unsigned int serial;
pthread_mutex_t mutex;
pthread_cond_t cond;
sb_barrier_cb_t callback;
void *arg;
int error;
} sb_barrier_t;
int sb_barrier_init(sb_barrier_t *barrier, unsigned int count,
sb_barrier_cb_t callback, void *arg);
int sb_barrier_wait(sb_barrier_t *barrier);
void sb_barrier_destroy(sb_barrier_t *barrier);
#endif /* SB_BARRIER_H */

View File

@ -142,12 +142,14 @@ int log_register(void)
/* Display command line options for registered log handlers */
void log_usage(void)
void log_print_help(void)
{
unsigned int i;
sb_list_item_t *pos;
log_handler_t *handler;
printf("Log options:\n");
for (i = 0; i < LOG_MSG_TYPE_MAX; i++)
{
SB_LIST_FOR_EACH(pos, handlers + i)

View File

@ -128,7 +128,7 @@ int log_register(void);
/* Display command line options for all register log handlers */
void log_usage(void);
void log_print_help(void);
/* Initialize logger */

View File

@ -40,7 +40,7 @@
#define CALL_ERROR(L, name) \
do { \
const char *err = lua_tostring(L, -1); \
log_text(LOG_DEBUG, "failed to execute function `%s': %s", \
log_text(LOG_FATAL, "failed to execute function `%s': %s", \
name, err ? err : "(null)"); \
} while (0)

View File

@ -68,6 +68,7 @@
#include "sb_options.h"
#include "scripting/sb_script.h"
#include "db_driver.h"
#include "sb_barrier.h"
#define VERSION_STRING PACKAGE" "PACKAGE_VERSION
@ -77,6 +78,9 @@
/* Maximum queue length for the tx-rate mode */
#define MAX_QUEUE_LEN 100000
/* Wait at most this number of seconds for worker threads to initialize */
#define THREAD_INIT_TIMEOUT 30
/* Random numbers distributions */
typedef enum
{
@ -163,10 +167,11 @@ sb_list_t tests;
sb_globals_t sb_globals;
sb_test_t *current_test;
/* Barrier to ensure we start the benchmark run when all workers are ready */
static sb_barrier_t thread_start_barrier;
/* Mutexes */
/* used to start test with all threads ready */
static pthread_mutex_t thread_start_mutex;
static pthread_attr_t thread_attr;
/* structures to handle queue of events, needed for tx_rate mode */
@ -178,30 +183,42 @@ static event_queue_elem_t queue_array[MAX_QUEUE_LEN];
static int queue_is_full;
static void print_header(void);
static void print_usage(void);
static void print_help(void);
static void print_run_mode(sb_test_t *);
#ifdef HAVE_ALARM
static void sigalrm_handler(int sig)
static void sigalrm_thread_init_timeout_handler(int sig)
{
if (sig == SIGALRM)
{
sb_globals.forced_shutdown_in_progress = 1;
if (sig != SIGALRM)
return;
sb_timer_stop(&sb_globals.exec_timer);
sb_timer_stop(&sb_globals.cumulative_timer1);
sb_timer_stop(&sb_globals.cumulative_timer2);
log_text(LOG_FATAL,
"Worker threads failed to initialize within %u seconds!",
THREAD_INIT_TIMEOUT);
log_text(LOG_FATAL,
"The --max-time limit has expired, forcing shutdown...");
exit(2);
}
if (current_test && current_test->ops.print_stats)
current_test->ops.print_stats(SB_STAT_CUMULATIVE);
static void sigalrm_forced_shutdown_handler(int sig)
{
if (sig != SIGALRM)
return;
log_done();
sb_globals.forced_shutdown_in_progress = 1;
exit(2);
}
sb_timer_stop(&sb_globals.exec_timer);
sb_timer_stop(&sb_globals.cumulative_timer1);
sb_timer_stop(&sb_globals.cumulative_timer2);
log_text(LOG_FATAL,
"The --max-time limit has expired, forcing shutdown...");
if (current_test && current_test->ops.print_stats)
current_test->ops.print_stats(SB_STAT_CUMULATIVE);
log_done();
exit(2);
}
#endif
@ -272,19 +289,20 @@ void print_header(void)
/* Print program usage */
void print_usage(void)
void print_help(void)
{
sb_list_item_t *pos;
sb_test_t *test;
printf("Usage:\n");
printf(" sysbench [general-options]... --test=<test-name> "
"[test-options]... command\n\n");
printf(" sysbench --test=<test-name> [options]... <command>\n\n");
printf("Commands: prepare run cleanup help version\n\n");
printf("General options:\n");
sb_print_options(general_args);
printf("Log options:\n");
log_usage();
log_print_help();
db_print_help();
printf("Compiled-in tests:\n");
SB_LIST_FOR_EACH(pos, &tests)
@ -293,7 +311,6 @@ void print_usage(void)
printf(" %s - %s\n", test->sname, test->lname);
}
printf("\n");
printf("Commands: prepare run cleanup help version\n\n");
printf("See 'sysbench --test=<name> help' for a list of options for each test.\n\n");
}
@ -456,10 +473,10 @@ void print_run_mode(sb_test_t *test)
}
/* Main runner test thread */
/* Main worker test thread */
static void *runner_thread(void *arg)
static void *worker_thread(void *arg)
{
sb_request_t request;
sb_thread_ctxt_t *ctxt;
@ -473,20 +490,20 @@ static void *runner_thread(void *arg)
test = ctxt->test;
thread_id = ctxt->id;
log_text(LOG_DEBUG, "Runner thread started (%d)!", thread_id);
if (test->ops.thread_init != NULL && test->ops.thread_init(thread_id) != 0)
{
log_text(LOG_DEBUG, "Worker thread (#%d) failed to initialize!", thread_id);
sb_globals.error = 1;
return NULL; /* thread initialization failed */
/* Avoid blocking the main thread */
sb_barrier_wait(&thread_start_barrier);
return NULL;
}
/*
We do this to make sure all threads get to this barrier
about the same time
*/
pthread_mutex_lock(&thread_start_mutex);
sb_globals.num_running++;
pthread_mutex_unlock(&thread_start_mutex);
log_text(LOG_DEBUG, "Worker thread (#%d) started!", thread_id);
/* Wait for other threads to initialize */
if (sb_barrier_wait(&thread_start_barrier) < 0)
return NULL;
do
{
@ -551,11 +568,7 @@ static void *runner_thread(void *arg)
if (test->ops.thread_done != NULL)
test->ops.thread_done(thread_id);
pthread_mutex_lock(&thread_start_mutex);
sb_globals.num_running--;
pthread_mutex_unlock(&thread_start_mutex);
return NULL;
return NULL;
}
static void *eventgen_thread_proc(void *arg)
@ -573,8 +586,9 @@ static void *eventgen_thread_proc(void *arg)
log_text(LOG_DEBUG, "Event generating thread started");
pthread_mutex_lock(&thread_start_mutex);
pthread_mutex_unlock(&thread_start_mutex);
/* Wait for other threads to initialize */
if (sb_barrier_wait(&thread_start_barrier) < 0)
return NULL;
curr_ns = sb_timer_value(&sb_globals.exec_timer);
/* emulate exponential distribution with Lambda = tx_rate */
@ -637,6 +651,12 @@ static void *report_thread_proc(void *arg)
(void)arg; /* unused */
log_text(LOG_DEBUG, "Reporting thread started");
/* Wait for other threads to initialize */
if (sb_barrier_wait(&thread_start_barrier) < 0)
return NULL;
if (current_test->ops.print_stats == NULL)
{
log_text(LOG_DEBUG, "Reporting not supported by the current test, ",
@ -644,11 +664,6 @@ static void *report_thread_proc(void *arg)
return NULL;
}
log_text(LOG_DEBUG, "Reporting thread started");
pthread_mutex_lock(&thread_start_mutex);
pthread_mutex_unlock(&thread_start_mutex);
pause_ns = interval_ns;
prev_ns = sb_timer_value(&sb_globals.exec_timer) + interval_ns;
for (;;)
@ -686,18 +701,19 @@ static void *checkpoints_thread_proc(void *arg)
(void)arg; /* unused */
log_text(LOG_DEBUG, "Checkpoints report thread started");
/* Wait for other threads to initialize */
if (sb_barrier_wait(&thread_start_barrier) < 0)
return NULL;
if (current_test->ops.print_stats == NULL)
{
log_text(LOG_DEBUG, "Reporting not supported by the current test, ",
"terminating the reporting thread");
"terminating the checkpoints thread");
return NULL;
}
log_text(LOG_DEBUG, "Checkpoints report thread started");
pthread_mutex_lock(&thread_start_mutex);
pthread_mutex_unlock(&thread_start_mutex);
for (i = 0; i < sb_globals.n_checkpoints; i++)
{
next_ns = SEC2NS(sb_globals.checkpoints[i]);
@ -723,11 +739,29 @@ static void *checkpoints_thread_proc(void *arg)
return NULL;
}
/*
Main test function. Start threads.
Wait for them to complete and measure time
*/
/* Callback to start timers when all threads are ready */
static int threads_started_callback(void *arg)
{
(void) arg; /* unused */
/* Report initialization errors to the main thread */
if (sb_globals.error)
return 1;
sb_globals.num_running = sb_globals.num_threads;
sb_timer_start(&sb_globals.exec_timer);
sb_timer_start(&sb_globals.cumulative_timer1);
sb_timer_start(&sb_globals.cumulative_timer2);
return 0;
}
/*
Main test function. Start threads.
Wait for them to complete and measure time
*/
static int run_test(sb_test_t *test)
{
@ -739,6 +773,7 @@ static int run_test(sb_test_t *test)
int report_thread_created = 0;
int checkpoints_thread_created = 0;
int eventgen_thread_created = 0;
unsigned int barrier_threads;
/* initialize test */
if (test->ops.init != NULL && test->ops.init() != 0)
@ -769,9 +804,6 @@ static int run_test(sb_test_t *test)
sb_globals.event_queue_length = 0;
queue_is_full = 0;
/* start mutex used for barrier */
pthread_mutex_init(&thread_start_mutex,NULL);
pthread_mutex_lock(&thread_start_mutex);
sb_globals.num_running = 0;
/* initialize attr */
@ -791,6 +823,19 @@ static int run_test(sb_test_t *test)
pthread_mutex_init(&rnd_mutex, NULL);
pthread_mutex_init(&report_interval_mutex, NULL);
/* Calculate the required number of threads for the start barrier */
barrier_threads = 1 + sb_globals.num_threads +
(sb_globals.report_interval > 0) +
(sb_globals.tx_rate > 0) +
(sb_globals.n_checkpoints > 0);
/* Initialize the start barrier */
if (sb_barrier_init(&thread_start_barrier, barrier_threads,
threads_started_callback, NULL)) {
log_errno(LOG_FATAL, "sb_barrier_init() failed");
return 1;
}
if (sb_globals.report_interval > 0)
{
/* Create a thread for intermediate statistic reports */
@ -827,36 +872,52 @@ static int run_test(sb_test_t *test)
checkpoints_thread_created = 1;
}
/* Starting the test threads */
/* Starting the worker threads */
for(i = 0; i < sb_globals.num_threads; i++)
{
if (sb_globals.error)
return 1;
if ((err = pthread_create(&(threads[i].thread), &thread_attr,
&runner_thread, (void*)(threads + i))) != 0)
&worker_thread, (void*)(threads + i))) != 0)
{
log_errno(LOG_FATAL, "pthread_create() for thread #%d failed.", i);
return 1;
}
}
sb_timer_start(&sb_globals.exec_timer); /* Start benchmark timer */
sb_timer_start(&sb_globals.cumulative_timer1);
sb_timer_start(&sb_globals.cumulative_timer2);
#ifdef HAVE_ALARM
/* Exit with an error if thread initialization timeout expires */
signal(SIGALRM, sigalrm_thread_init_timeout_handler);
alarm(THREAD_INIT_TIMEOUT);
#endif
log_text(LOG_NOTICE, "Initializing worker threads...\n");
if (sb_barrier_wait(&thread_start_barrier) < 0)
{
log_text(LOG_FATAL, "Thread initialization failed!");
return 1;
}
#ifdef HAVE_ALARM
/* Set the alarm to force shutdown */
alarm(0);
if (sb_globals.force_shutdown)
{
/* Set the alarm to force shutdown */
signal(SIGALRM, sigalrm_forced_shutdown_handler);
alarm(sb_globals.max_time + sb_globals.timeout);
}
#endif
pthread_mutex_unlock(&thread_start_mutex);
log_text(LOG_NOTICE, "Threads started!\n");
log_text(LOG_NOTICE, "Threads started!\n");
for(i = 0; i < sb_globals.num_threads; i++)
{
if((err = pthread_join(threads[i].thread, NULL)) != 0)
log_errno(LOG_FATAL, "pthread_join() for thread #%d failed.", i);
sb_globals.num_running--;
}
sb_timer_stop(&sb_globals.exec_timer);
@ -886,8 +947,6 @@ static int run_test(sb_test_t *test)
pthread_mutex_destroy(&sb_globals.exec_mutex);
pthread_mutex_destroy(&thread_start_mutex);
/* finalize test */
if (test->ops.done != NULL)
(*(test->ops.done))();
@ -1118,7 +1177,7 @@ int main(int argc, char *argv[])
/* Parse command line arguments */
if (parse_arguments(argc,argv))
{
print_usage();
print_help();
exit(1);
}
@ -1131,7 +1190,7 @@ int main(int argc, char *argv[])
if (sb_globals.command == SB_COMMAND_NULL)
{
fprintf(stderr, "Missing required command argument.\n");
print_usage();
print_help();
exit(1);
}
@ -1155,7 +1214,7 @@ int main(int argc, char *argv[])
if (sb_globals.command == SB_COMMAND_HELP)
{
if (test == NULL)
print_usage();
print_help();
else
{
if (test->args != NULL)
@ -1175,7 +1234,7 @@ int main(int argc, char *argv[])
if (testname == NULL)
{
fprintf(stderr, "Missing required argument: --test.\n");
print_usage();
print_help();
exit(1);
}
@ -1213,9 +1272,6 @@ int main(int argc, char *argv[])
/* 'run' command */
current_test = test;
#ifdef HAVE_ALARM
signal(SIGALRM, sigalrm_handler);
#endif
if (run_test(test))
exit(1);

View File

@ -361,7 +361,7 @@ int file_prepare(void)
files[i] = sb_open(file_name);
if (!VALID_FILE(files[i]))
{
log_errno(LOG_FATAL, "Cannot open file");
log_errno(LOG_FATAL, "Cannot open file '%s'", file_name);
return 1;
}
}
@ -2057,11 +2057,10 @@ static FILE_DESCRIPTOR sb_open(const char *name)
return SB_INVALID_FILE;
#ifndef _WIN32
file = open(name, O_CREAT | O_RDWR | flags,
S_IRUSR | S_IWUSR);
file = open(name, O_RDWR | flags, S_IRUSR | S_IWUSR);
#else
file = CreateFile(name, GENERIC_READ|GENERIC_WRITE, 0, NULL,
OPEN_ALWAYS, flags, NULL);
file = CreateFile(name, GENERIC_READ|GENERIC_WRITE, 0, NULL, OPEN_EXISTING,
flags, NULL);
#endif
#ifdef HAVE_DIRECTIO