Reimplement tx_rate with lockless bounded FIFO provided by ConcurrencyKit.

This commit is contained in:
Alexey Kopytov
2017-01-30 21:26:55 +03:00
parent d3e28b3dc2
commit efb168643f
4 changed files with 77 additions and 74 deletions

View File

@ -956,11 +956,9 @@ void db_report_intermediate(sb_stat_t *stat)
if (sb_globals.tx_rate > 0)
{
pthread_mutex_lock(&event_queue_mutex);
log_timestamp(LOG_NOTICE, seconds,
"queue length: %d, concurrency: %d",
sb_globals.event_queue_length, sb_globals.concurrency);
pthread_mutex_unlock(&event_queue_mutex);
log_timestamp(LOG_NOTICE, stat->time_total,
"queue length: %" PRIu64", concurrency: %" PRIu64,
stat->queue_length, stat->concurrency);
}
}

View File

@ -1514,6 +1514,12 @@ static void sb_lua_report_intermediate(sb_stat_t *stat)
stat_to_lua_table(L, stat);
/*
The following is only available for intermediate reports with tx_rate > 0
*/
stat_to_number(queue_length);
stat_to_number(concurrency);
if (lua_pcall(L, 1, 0, 0))
{
call_error(L, REPORT_INTERMEDIATE_HOOK);

View File

@ -75,21 +75,16 @@
#include "sb_barrier.h"
#include "ck_cc.h"
#include "ck_ring.h"
#define VERSION_STRING PACKAGE" "PACKAGE_VERSION SB_GIT_SHA
/* Maximum queue length for the tx-rate mode */
#define MAX_QUEUE_LEN 100000
/* Maximum queue length for the tx-rate mode. Must be a power of 2 */
#define MAX_QUEUE_LEN 131072
/* Wait at most this number of seconds for worker threads to initialize */
#define THREAD_INIT_TIMEOUT 30
/* Event queue data type for the tx-rate mode */
typedef struct {
unsigned long long event_time;
sb_list_item_t listitem;
} event_queue_elem_t;
/* General options */
sb_arg_t general_args[] =
{
@ -129,14 +124,13 @@ sb_test_t *current_test;
static sb_barrier_t thread_start_barrier;
/* structures to handle queue of events, needed for tx_rate mode */
pthread_mutex_t event_queue_mutex;
static sb_list_t event_queue;
static pthread_cond_t event_queue_cv;
static event_queue_elem_t queue_array[MAX_QUEUE_LEN];
static uint64_t queue_array[MAX_QUEUE_LEN] CK_CC_CACHELINE;
static ck_ring_buffer_t queue_ring_buffer[MAX_QUEUE_LEN] CK_CC_CACHELINE;
static ck_ring_t queue_ring CK_CC_CACHELINE;
static int queue_is_full;
static int queue_is_full CK_CC_CACHELINE;
static int report_thread_created;
static int report_thread_created CK_CC_CACHELINE;
static int checkpoints_thread_created;
static int eventgen_thread_created;
@ -228,6 +222,12 @@ static void report_intermediate(void)
stat.time_interval = NS2SEC(sb_timer_checkpoint(&sb_intermediate_timer));
if (sb_globals.tx_rate > 0)
{
stat.queue_length = ck_ring_size(&queue_ring);
stat.concurrency = ck_pr_load_int(&sb_globals.concurrency);
}
if (current_test && current_test->ops.report_intermediate)
current_test->ops.report_intermediate(&stat);
else
@ -688,6 +688,28 @@ bool sb_more_events(int thread_id)
ck_pr_faa_64(&sb_globals.nevents, 1) >= sb_globals.max_requests)
return false;
/* If we are in tx_rate mode, we take events from queue */
if (sb_globals.tx_rate > 0)
{
void *ptr;
while (!ck_ring_dequeue_spmc(&queue_ring, queue_ring_buffer, &ptr) &&
!ck_pr_load_int(&queue_is_full))
ck_pr_stall();
if (ck_pr_load_int(&queue_is_full))
{
log_text(LOG_FATAL, "Event queue is full. Terminating the worker thread");
return false;
}
ck_pr_inc_int(&sb_globals.concurrency);
timers[thread_id].queue_time = sb_timer_value(&sb_exec_timer) -
((uint64_t *) ptr)[0];
}
return true;
}
@ -698,43 +720,33 @@ bool sb_more_events(int thread_id)
sb_event_t sb_next_event(sb_test_t *test, int thread_id)
{
sb_list_item_t *pos;
event_queue_elem_t *elem;
unsigned long long queue_start_time = 0;
uint64_t queue_start_time = 0;
/* If we are in tx_rate mode, we take events from queue */
if (sb_globals.tx_rate > 0)
{
if (queue_is_full)
void *ptr;
while (!ck_ring_dequeue_spmc(&queue_ring, queue_ring_buffer, &ptr) &&
!ck_pr_load_int(&queue_is_full))
ck_pr_stall();
if (ck_pr_load_int(&queue_is_full))
{
log_text(LOG_FATAL, "Event queue is full.");
log_text(LOG_FATAL, "Event queue is full. Terminating the worker thread");
sb_event_t event;
event.type = SB_REQ_TYPE_NULL;
return event;
}
pthread_mutex_lock(&event_queue_mutex);
while(!sb_globals.event_queue_length)
pthread_cond_wait(&event_queue_cv, &event_queue_mutex);
SB_LIST_ONCE(pos, &event_queue)
{
elem = SB_LIST_ENTRY(pos, event_queue_elem_t, listitem);
queue_start_time = elem->event_time;;
queue_start_time = ((uint64_t *) ptr)[0];
SB_LIST_DELETE(pos);
sb_globals.event_queue_length--;
}
sb_globals.concurrency++;
pthread_mutex_unlock(&event_queue_mutex);
ck_pr_inc_int(&sb_globals.concurrency);
timers[thread_id].queue_time = sb_timer_value(&sb_exec_timer) -
queue_start_time;
}
return test->ops.next_event(thread_id);
@ -775,9 +787,7 @@ void sb_event_stop(int thread_id)
if (sb_globals.tx_rate > 0)
{
pthread_mutex_lock(&event_queue_mutex);
sb_globals.concurrency--;
pthread_mutex_unlock(&event_queue_mutex);
ck_pr_dec_int(&sb_globals.concurrency);
}
}
@ -873,7 +883,8 @@ static void *eventgen_thread_proc(void *arg)
/* Initialize thread-local RNG state */
sb_rand_thread_init();
SB_LIST_INIT(&event_queue);
ck_ring_init(&queue_ring, MAX_QUEUE_LEN);
i = 0;
log_text(LOG_DEBUG, "Event generating thread started");
@ -904,30 +915,20 @@ static void *eventgen_thread_proc(void *arg)
pause_ns = next_ns - curr_ns;
usleep(pause_ns / 1000);
}
else
/* Enqueue a new event */
queue_array[i++] = sb_timer_value(&sb_exec_timer);
if (ck_ring_enqueue_spmc(&queue_ring, queue_ring_buffer,
&queue_array[i]) == false)
{
log_timestamp(LOG_DEBUG, NS2SEC(curr_ns),
"Event generation thread is too slow");
}
queue_array[i].event_time = sb_timer_value(&sb_exec_timer);
pthread_mutex_lock(&event_queue_mutex);
SB_LIST_ADD_TAIL(&queue_array[i].listitem, &event_queue);
sb_globals.event_queue_length++;
if (sb_globals.event_queue_length >= MAX_QUEUE_LEN)
queue_is_full = 1;
pthread_cond_signal(&event_queue_cv);
pthread_mutex_unlock(&event_queue_mutex);
if (queue_is_full)
{
log_text(LOG_FATAL, "Event queue is full.");
ck_pr_store_int(&queue_is_full, 1);
log_text(LOG_FATAL,
"The event queue is full. This means the worker threads are "
"unable to keep up with the specified event generation rate");
return NULL;
}
i++;
if (i >= MAX_QUEUE_LEN)
if (i >= MAX_QUEUE_LEN - 1)
i = 0;
}
@ -1085,9 +1086,6 @@ static int run_test(sb_test_t *test)
pthread_mutex_init(&sb_globals.exec_mutex, NULL);
pthread_mutex_init(&event_queue_mutex, NULL);
pthread_cond_init(&event_queue_cv, NULL);
sb_globals.event_queue_length = 0;
queue_is_full = 0;
sb_globals.num_running = 0;

View File

@ -105,6 +105,9 @@ typedef struct {
uint64_t other; /* Number of other operations */
uint64_t errors; /* Number of ignored errors */
uint64_t reconnects; /* Number of reconnects to server */
uint64_t queue_length; /* Event queue length (tx_rate-only) */
uint64_t concurrency; /* Number of in-flight events (tx_rate-only) */
} sb_stat_t;
/* Commands */
@ -192,13 +195,11 @@ typedef struct
unsigned char debug; /* debug flag */
unsigned int timeout; /* forced shutdown timeout */
unsigned char validate; /* validation flag */
unsigned char verbosity; /* log verbosity */
int event_queue_length; /* length of request queue when tx-rate is
used */
int concurrency; /* number of concurrent requests when tx-rate is
used */
/* 1 when forced shutdown is in progress, 0 otherwise */
int force_shutdown; /* whether we must force test shutdown */
unsigned char verbosity CK_CC_CACHELINE; /* log verbosity */
int concurrency CK_CC_CACHELINE; /* number of concurrent requests
when tx-rate is used */
int force_shutdown CK_CC_CACHELINE; /* whether we must force test
shutdown */
int forced_shutdown_in_progress;
uint64_t nevents CK_CC_CACHELINE; /* event counter */
} sb_globals_t;