diff --git a/src/db_driver.c b/src/db_driver.c index e7a5a79..f0691db 100644 --- a/src/db_driver.c +++ b/src/db_driver.c @@ -956,11 +956,9 @@ void db_report_intermediate(sb_stat_t *stat) if (sb_globals.tx_rate > 0) { - pthread_mutex_lock(&event_queue_mutex); - log_timestamp(LOG_NOTICE, seconds, - "queue length: %d, concurrency: %d", - sb_globals.event_queue_length, sb_globals.concurrency); - pthread_mutex_unlock(&event_queue_mutex); + log_timestamp(LOG_NOTICE, stat->time_total, + "queue length: %" PRIu64", concurrency: %" PRIu64, + stat->queue_length, stat->concurrency); } } diff --git a/src/sb_lua.c b/src/sb_lua.c index c23f9d6..0cbc6e9 100644 --- a/src/sb_lua.c +++ b/src/sb_lua.c @@ -1514,6 +1514,12 @@ static void sb_lua_report_intermediate(sb_stat_t *stat) stat_to_lua_table(L, stat); + /* + The following is only available for intermediate reports with tx_rate > 0 + */ + stat_to_number(queue_length); + stat_to_number(concurrency); + if (lua_pcall(L, 1, 0, 0)) { call_error(L, REPORT_INTERMEDIATE_HOOK); diff --git a/src/sysbench.c b/src/sysbench.c index 7cfd179..6f0e50f 100644 --- a/src/sysbench.c +++ b/src/sysbench.c @@ -75,21 +75,16 @@ #include "sb_barrier.h" #include "ck_cc.h" +#include "ck_ring.h" #define VERSION_STRING PACKAGE" "PACKAGE_VERSION SB_GIT_SHA -/* Maximum queue length for the tx-rate mode */ -#define MAX_QUEUE_LEN 100000 +/* Maximum queue length for the tx-rate mode. Must be a power of 2 */ +#define MAX_QUEUE_LEN 131072 /* Wait at most this number of seconds for worker threads to initialize */ #define THREAD_INIT_TIMEOUT 30 -/* Event queue data type for the tx-rate mode */ -typedef struct { - unsigned long long event_time; - sb_list_item_t listitem; -} event_queue_elem_t; - /* General options */ sb_arg_t general_args[] = { @@ -129,14 +124,13 @@ sb_test_t *current_test; static sb_barrier_t thread_start_barrier; /* structures to handle queue of events, needed for tx_rate mode */ -pthread_mutex_t event_queue_mutex; -static sb_list_t event_queue; -static pthread_cond_t event_queue_cv; -static event_queue_elem_t queue_array[MAX_QUEUE_LEN]; +static uint64_t queue_array[MAX_QUEUE_LEN] CK_CC_CACHELINE; +static ck_ring_buffer_t queue_ring_buffer[MAX_QUEUE_LEN] CK_CC_CACHELINE; +static ck_ring_t queue_ring CK_CC_CACHELINE; -static int queue_is_full; +static int queue_is_full CK_CC_CACHELINE; -static int report_thread_created; +static int report_thread_created CK_CC_CACHELINE; static int checkpoints_thread_created; static int eventgen_thread_created; @@ -228,6 +222,12 @@ static void report_intermediate(void) stat.time_interval = NS2SEC(sb_timer_checkpoint(&sb_intermediate_timer)); + if (sb_globals.tx_rate > 0) + { + stat.queue_length = ck_ring_size(&queue_ring); + stat.concurrency = ck_pr_load_int(&sb_globals.concurrency); + } + if (current_test && current_test->ops.report_intermediate) current_test->ops.report_intermediate(&stat); else @@ -688,6 +688,28 @@ bool sb_more_events(int thread_id) ck_pr_faa_64(&sb_globals.nevents, 1) >= sb_globals.max_requests) return false; + /* If we are in tx_rate mode, we take events from queue */ + if (sb_globals.tx_rate > 0) + { + void *ptr; + + while (!ck_ring_dequeue_spmc(&queue_ring, queue_ring_buffer, &ptr) && + !ck_pr_load_int(&queue_is_full)) + ck_pr_stall(); + + if (ck_pr_load_int(&queue_is_full)) + { + log_text(LOG_FATAL, "Event queue is full. Terminating the worker thread"); + + return false; + } + + ck_pr_inc_int(&sb_globals.concurrency); + + timers[thread_id].queue_time = sb_timer_value(&sb_exec_timer) - + ((uint64_t *) ptr)[0]; + } + return true; } @@ -698,43 +720,33 @@ bool sb_more_events(int thread_id) sb_event_t sb_next_event(sb_test_t *test, int thread_id) { - sb_list_item_t *pos; - event_queue_elem_t *elem; - unsigned long long queue_start_time = 0; + uint64_t queue_start_time = 0; /* If we are in tx_rate mode, we take events from queue */ if (sb_globals.tx_rate > 0) { - if (queue_is_full) + void *ptr; + + while (!ck_ring_dequeue_spmc(&queue_ring, queue_ring_buffer, &ptr) && + !ck_pr_load_int(&queue_is_full)) + ck_pr_stall(); + + if (ck_pr_load_int(&queue_is_full)) { - log_text(LOG_FATAL, "Event queue is full."); + log_text(LOG_FATAL, "Event queue is full. Terminating the worker thread"); sb_event_t event; event.type = SB_REQ_TYPE_NULL; return event; } - pthread_mutex_lock(&event_queue_mutex); - while(!sb_globals.event_queue_length) - pthread_cond_wait(&event_queue_cv, &event_queue_mutex); - SB_LIST_ONCE(pos, &event_queue) - { - elem = SB_LIST_ENTRY(pos, event_queue_elem_t, listitem); - queue_start_time = elem->event_time;; + queue_start_time = ((uint64_t *) ptr)[0]; - SB_LIST_DELETE(pos); - - sb_globals.event_queue_length--; - } - - sb_globals.concurrency++; - - pthread_mutex_unlock(&event_queue_mutex); + ck_pr_inc_int(&sb_globals.concurrency); timers[thread_id].queue_time = sb_timer_value(&sb_exec_timer) - queue_start_time; - } return test->ops.next_event(thread_id); @@ -775,9 +787,7 @@ void sb_event_stop(int thread_id) if (sb_globals.tx_rate > 0) { - pthread_mutex_lock(&event_queue_mutex); - sb_globals.concurrency--; - pthread_mutex_unlock(&event_queue_mutex); + ck_pr_dec_int(&sb_globals.concurrency); } } @@ -873,7 +883,8 @@ static void *eventgen_thread_proc(void *arg) /* Initialize thread-local RNG state */ sb_rand_thread_init(); - SB_LIST_INIT(&event_queue); + ck_ring_init(&queue_ring, MAX_QUEUE_LEN); + i = 0; log_text(LOG_DEBUG, "Event generating thread started"); @@ -904,30 +915,20 @@ static void *eventgen_thread_proc(void *arg) pause_ns = next_ns - curr_ns; usleep(pause_ns / 1000); } - else + + /* Enqueue a new event */ + queue_array[i++] = sb_timer_value(&sb_exec_timer); + if (ck_ring_enqueue_spmc(&queue_ring, queue_ring_buffer, + &queue_array[i]) == false) { - log_timestamp(LOG_DEBUG, NS2SEC(curr_ns), - "Event generation thread is too slow"); - } - - - queue_array[i].event_time = sb_timer_value(&sb_exec_timer); - pthread_mutex_lock(&event_queue_mutex); - SB_LIST_ADD_TAIL(&queue_array[i].listitem, &event_queue); - sb_globals.event_queue_length++; - if (sb_globals.event_queue_length >= MAX_QUEUE_LEN) - queue_is_full = 1; - pthread_cond_signal(&event_queue_cv); - pthread_mutex_unlock(&event_queue_mutex); - - if (queue_is_full) - { - log_text(LOG_FATAL, "Event queue is full."); + ck_pr_store_int(&queue_is_full, 1); + log_text(LOG_FATAL, + "The event queue is full. This means the worker threads are " + "unable to keep up with the specified event generation rate"); return NULL; } - i++; - if (i >= MAX_QUEUE_LEN) + if (i >= MAX_QUEUE_LEN - 1) i = 0; } @@ -1085,9 +1086,6 @@ static int run_test(sb_test_t *test) pthread_mutex_init(&sb_globals.exec_mutex, NULL); - pthread_mutex_init(&event_queue_mutex, NULL); - pthread_cond_init(&event_queue_cv, NULL); - sb_globals.event_queue_length = 0; queue_is_full = 0; sb_globals.num_running = 0; diff --git a/src/sysbench.h b/src/sysbench.h index f0a41d6..53d93f7 100644 --- a/src/sysbench.h +++ b/src/sysbench.h @@ -105,6 +105,9 @@ typedef struct { uint64_t other; /* Number of other operations */ uint64_t errors; /* Number of ignored errors */ uint64_t reconnects; /* Number of reconnects to server */ + + uint64_t queue_length; /* Event queue length (tx_rate-only) */ + uint64_t concurrency; /* Number of in-flight events (tx_rate-only) */ } sb_stat_t; /* Commands */ @@ -192,13 +195,11 @@ typedef struct unsigned char debug; /* debug flag */ unsigned int timeout; /* forced shutdown timeout */ unsigned char validate; /* validation flag */ - unsigned char verbosity; /* log verbosity */ - int event_queue_length; /* length of request queue when tx-rate is - used */ - int concurrency; /* number of concurrent requests when tx-rate is - used */ - /* 1 when forced shutdown is in progress, 0 otherwise */ - int force_shutdown; /* whether we must force test shutdown */ + unsigned char verbosity CK_CC_CACHELINE; /* log verbosity */ + int concurrency CK_CC_CACHELINE; /* number of concurrent requests + when tx-rate is used */ + int force_shutdown CK_CC_CACHELINE; /* whether we must force test + shutdown */ int forced_shutdown_in_progress; uint64_t nevents CK_CC_CACHELINE; /* event counter */ } sb_globals_t;