From 1cf5b33eda016ab14adbdf94d159a040ccbbf722 Mon Sep 17 00:00:00 2001 From: Alexey Kopytov Date: Sun, 29 Jan 2017 13:45:26 +0300 Subject: [PATCH] Move counters code from db_driver.c to sb_counter.c. --- src/Makefile.am | 7 +- src/db_driver.c | 204 +++++++++++------------------- src/db_driver.h | 40 +----- src/drivers/mysql/drv_mysql.c | 32 ++--- src/drivers/pgsql/drv_pgsql.c | 8 +- src/lua/internal/sysbench.sql.lua | 18 +-- src/sb_counter.c | 85 +++++++++++++ src/sb_counter.h | 78 ++++++++++++ src/sb_lua.c | 4 +- src/sb_util.h | 4 + src/sysbench.c | 14 +- src/sysbench.h | 8 +- src/tests/fileio/sb_fileio.c | 8 +- src/tests/memory/sb_memory.c | 8 +- 14 files changed, 298 insertions(+), 220 deletions(-) create mode 100644 src/sb_counter.c create mode 100644 src/sb_counter.h diff --git a/src/Makefile.am b/src/Makefile.am index a4ec6a4..cdb4dcd 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -48,10 +48,11 @@ sysbench_SOURCES = sysbench.c sysbench.h sb_timer.c sb_timer.h \ sb_options.c sb_options.h sb_logger.c sb_logger.h sb_list.h db_driver.h \ db_driver.c sb_histogram.c sb_histogram.h sb_rand.c sb_rand.h \ sb_thread.c sb_thread.h sb_barrier.c sb_barrier.h sb_lua.c \ -sb_lua.h lua/internal/sysbench.lua.h lua/internal/sysbench.sql.lua.h \ -lua/internal/sysbench.rand.lua.h lua/internal/sysbench.cmdline.lua.h sb_util.h \ +sb_lua.h sb_util.h sb_util.c sb_counter.h sb_counter.c \ +lua/internal/sysbench.lua.h lua/internal/sysbench.sql.lua.h \ +lua/internal/sysbench.rand.lua.h lua/internal/sysbench.cmdline.lua.h \ lua/internal/sysbench.compat.lua.h \ -xoroshiro128plus.h sb_util.c +xoroshiro128plus.h sysbench_LDADD = tests/fileio/libsbfileio.a tests/threads/libsbthreads.a \ tests/memory/libsbmemory.a tests/cpu/libsbcpu.a \ diff --git a/src/db_driver.c b/src/db_driver.c index ccd19c3..b8d91aa 100644 --- a/src/db_driver.c +++ b/src/db_driver.c @@ -51,13 +51,6 @@ /* Global variables */ db_globals_t db_globals CK_CC_CACHELINE; -/* per-thread stats */ -db_stats_t *thread_stats CK_CC_CACHELINE; - -/* Used in intermediate reports */ -static db_stats_t last_stats; - -/* Static variables */ static sb_list_t drivers; /* list of available DB drivers */ static uint8_t stats_enabled; @@ -69,8 +62,6 @@ static pthread_once_t db_global_once = PTHREAD_ONCE_INIT; static sb_timer_t *exec_timers; static sb_timer_t *fetch_timers; -pthread_mutex_t print_stats_mutex; - /* Static functions */ static int db_parse_arguments(void); @@ -94,29 +85,6 @@ static sb_arg_t db_args[] = SB_OPT_END }; -static inline uint64_t db_stat_val(db_stats_t stats, db_stat_type_t type) -{ - return ck_pr_load_64(&stats[type]); -} - -static inline void db_stat_merge(db_stats_t dst, db_stats_t src) -{ - for (size_t i = 0; i < DB_STAT_MAX; i++) - ck_pr_add_64(&dst[i], db_stat_val(src, i)); -} - -static inline void db_stat_copy(db_stats_t dst, db_stats_t src) -{ - for (size_t i = 0; i < DB_STAT_MAX; i++) - ck_pr_store_64(&dst[i], db_stat_val(src, i)); -} - -static inline uint64_t db_stat_diff(db_stats_t a, db_stats_t b, - db_stat_type_t type) -{ - return db_stat_val(a, type) - db_stat_val(b, type); -} - /* Register available database drivers and command line arguments */ int db_register(void) @@ -210,17 +178,6 @@ static bool check_print_stats(void) return rc; } -/* Initialize per-thread stats */ - -int db_thread_stat_init(void) -{ - thread_stats = sb_memalign(sb_globals.num_threads * sizeof(db_stats_t), - CK_MD_CACHELINE); - - return thread_stats == NULL; -} - - static void db_init(void) { if (SB_LIST_IS_EMPTY(&drivers)) @@ -232,8 +189,6 @@ static void db_init(void) if (db_parse_arguments()) return; - pthread_mutex_init(&print_stats_mutex, NULL); - /* Initialize timers if in debug mode */ if (db_globals.debug) { @@ -505,11 +460,11 @@ db_result_t *db_execute(db_stmt_t *stmt) con->error = con->driver->ops.execute(stmt, rs); - db_thread_stat_inc(con->thread_id, rs->stat_type); + sb_counter_inc(con->thread_id, rs->counter); if (SB_LIKELY(con->error == DB_ERROR_NONE)) { - if (rs->stat_type == DB_STAT_READ) + if (rs->counter == SB_CNT_READ) { con->state = DB_CONN_RESULT_SET; return rs; @@ -592,11 +547,11 @@ db_result_t *db_query(db_conn_t *con, const char *query, size_t len) con->error = con->driver->ops.query(con, query, len, rs); - db_thread_stat_inc(con->thread_id, rs->stat_type); + sb_counter_inc(con->thread_id, rs->counter); if (SB_LIKELY(con->error == DB_ERROR_NONE)) { - if (rs->stat_type == DB_STAT_READ) + if (rs->counter == SB_CNT_READ) { con->state = DB_CONN_RESULT_SET; return rs; @@ -720,11 +675,6 @@ void db_done(void) exec_timers = fetch_timers = NULL; } - free(thread_stats); - thread_stats = NULL; - - pthread_mutex_destroy(&print_stats_mutex); - SB_LIST_FOR_EACH(pos, &drivers) { drv = SB_LIST_ENTRY(pos, db_driver_t, listitem); @@ -983,88 +933,75 @@ int db_bulk_insert_done(db_conn_t *con) return 0; } -/* Print database-specific test stats */ - -void db_print_stats(sb_stat_t type) +static void db_print_stats_intermediate(void) { - double seconds; - unsigned int i; - sb_timer_t exec_timer; - sb_timer_t fetch_timer; - db_stats_t stats; - - SB_COMPILE_TIME_ASSERT(sizeof(db_stats_t) % CK_MD_CACHELINE == 0); + sb_counters_t cnt; /* Don't print stats if no drivers are used */ if (!check_print_stats()) return; - /* - Prevent interval and checkpoint reporting threads from using and updating - thread stats concurrently. - */ - pthread_mutex_lock(&print_stats_mutex); + const double percentile_val = + sb_histogram_get_pct_intermediate(&sb_latency_histogram, + sb_globals.percentile); + sb_counters_agg_intermediate(cnt); - /* Summarize per-thread counters */ - memset(&stats, 0, sizeof(db_stats_t)); - for (i = 0; i < sb_globals.num_threads; i++) - db_stat_merge(stats, thread_stats[i]); + const double seconds = NS2SEC(sb_timer_checkpoint(&sb_intermediate_timer)); - if (type == SB_STAT_INTERMEDIATE) + const uint64_t events = cnt[SB_CNT_EVENT]; + const uint64_t reads = cnt[SB_CNT_READ]; + const uint64_t writes = cnt[SB_CNT_WRITE]; + const uint64_t others = cnt[SB_CNT_OTHER]; + const uint64_t errors = cnt[SB_CNT_ERROR]; + const uint64_t reconnects = cnt[SB_CNT_RECONNECT]; + + log_timestamp(LOG_NOTICE, NS2SEC(sb_timer_value(&sb_exec_timer)), + "threads: %d tps: %4.2f " + "qps: %4.2f (r/w/o: %4.2f/%4.2f/%4.2f) " + "latency: %4.2fms (%u%%) errors/s: %4.2f " + "reconnects/s: %4.2f", + sb_globals.num_running, + events / seconds, + (reads + writes + others) / seconds, + reads / seconds, + writes / seconds, + others / seconds, + percentile_val, + sb_globals.percentile, + errors / seconds, + reconnects / seconds); + + if (sb_globals.tx_rate > 0) { - seconds = NS2SEC(sb_timer_checkpoint(&sb_intermediate_timer)); - - const double percentile_val = - sb_histogram_get_pct_intermediate(&sb_latency_histogram, - sb_globals.percentile); - const uint64_t reads = db_stat_diff(stats, last_stats, DB_STAT_READ); - const uint64_t writes = db_stat_diff(stats, last_stats, DB_STAT_WRITE); - const uint64_t others = db_stat_diff(stats, last_stats, DB_STAT_OTHER); - const uint64_t errors = db_stat_diff(stats, last_stats, DB_STAT_ERROR); - const uint64_t reconnects = db_stat_diff(stats, last_stats, - DB_STAT_RECONNECT); - - log_timestamp(LOG_NOTICE, NS2SEC(sb_timer_value(&sb_exec_timer)), - "threads: %d tps: %4.2f " - "qps: %4.2f (r/w/o: %4.2f/%4.2f/%4.2f) " - "latency: %4.2fms (%u%%) errors/s: %4.2f " - "reconnects/s: %4.2f", - sb_globals.num_running, - db_stat_diff(stats, last_stats, DB_STAT_TRX) / seconds, - (reads + writes + others) / seconds, - reads / seconds, - writes / seconds, - others / seconds, - percentile_val, - sb_globals.percentile, - errors / seconds, - reconnects / seconds); - if (sb_globals.tx_rate > 0) - { - pthread_mutex_lock(&event_queue_mutex); - - log_timestamp(LOG_NOTICE, seconds, - "queue length: %d, concurrency: %d", - sb_globals.event_queue_length, sb_globals.concurrency); - - pthread_mutex_unlock(&event_queue_mutex); - } - - db_stat_copy(last_stats, stats); - - goto end; + pthread_mutex_lock(&event_queue_mutex); + log_timestamp(LOG_NOTICE, seconds, + "queue length: %d, concurrency: %d", + sb_globals.event_queue_length, sb_globals.concurrency); + pthread_mutex_unlock(&event_queue_mutex); } - else if (type != SB_STAT_CUMULATIVE) - goto end; +} - seconds = NS2SEC(sb_timer_checkpoint(&sb_checkpoint_timer1)); - const uint64_t reads = db_stat_val(stats, DB_STAT_READ); - const uint64_t writes = db_stat_val(stats, DB_STAT_WRITE); - const uint64_t others = db_stat_val(stats, DB_STAT_OTHER); - const uint64_t transactions = db_stat_val(stats, DB_STAT_TRX); - const uint64_t errors = db_stat_val(stats, DB_STAT_ERROR); - const uint64_t reconnects = db_stat_val(stats, DB_STAT_RECONNECT); +static void db_print_stats_cumulative(void) +{ + sb_counters_t cnt; + sb_timer_t exec_timer; + sb_timer_t fetch_timer; + + /* Don't print stats if no drivers are used */ + if (!check_print_stats()) + return; + + sb_counters_agg_cumulative(cnt); + + const double seconds = NS2SEC(sb_timer_checkpoint(&sb_checkpoint_timer1)); + + const uint64_t events = cnt[SB_CNT_EVENT]; + const uint64_t reads = cnt[SB_CNT_READ]; + const uint64_t writes = cnt[SB_CNT_WRITE]; + const uint64_t others = cnt[SB_CNT_OTHER]; + const uint64_t errors = cnt[SB_CNT_ERROR]; + const uint64_t reconnects = cnt[SB_CNT_RECONNECT]; log_text(LOG_NOTICE, "OLTP test statistics:"); log_text(LOG_NOTICE, " queries performed:"); @@ -1077,7 +1014,7 @@ void db_print_stats(sb_stat_t type) log_text(LOG_NOTICE, " total: %" PRIu64, reads + writes + others); log_text(LOG_NOTICE, " transactions: %-6" PRIu64 - " (%.2f per sec.)", transactions, transactions / seconds); + " (%.2f per sec.)", events, events / seconds); log_text(LOG_NOTICE, " queries: %-6" PRIu64 " (%.2f per sec.)", reads + writes + others, (reads + writes + others) / seconds); @@ -1091,7 +1028,7 @@ void db_print_stats(sb_stat_t type) sb_timer_init(&exec_timer); sb_timer_init(&fetch_timer); - for (i = 0; i < sb_globals.num_threads; i++) + for (unsigned i = 0; i < sb_globals.num_threads; i++) { exec_timer = sb_timer_merge(&exec_timer, exec_timers + i); fetch_timer = sb_timer_merge(&fetch_timer, fetch_timers + i); @@ -1118,11 +1055,17 @@ void db_print_stats(sb_stat_t type) log_text(LOG_DEBUG, " total: %.4fs", NS2SEC(sb_timer_sum(&fetch_timer))); } +} - db_reset_stats(); -end: - pthread_mutex_unlock(&print_stats_mutex); +/* Print database-specific test stats */ + +void db_print_stats(sb_report_t type) +{ + if (type == SB_REPORT_INTERMEDIATE) + db_print_stats_intermediate(); + else + db_print_stats_cumulative(); } @@ -1130,9 +1073,6 @@ static void db_reset_stats(void) { unsigned int i; - memset(thread_stats, 0, sb_globals.num_threads * sizeof(db_stats_t)); - memset(last_stats, 0, sizeof(db_stats_t)); - /* So that intermediate stats are calculated from the current moment rather than from the previous intermediate report diff --git a/src/db_driver.h b/src/db_driver.h index 60b9790..6fbc663 100644 --- a/src/db_driver.h +++ b/src/db_driver.h @@ -26,8 +26,7 @@ #include "sysbench.h" #include "sb_list.h" #include "sb_histogram.h" -#include "sb_util.h" -#include "ck_pr.h" +#include "sb_counter.h" /* Prepared statements usage modes */ @@ -174,18 +173,6 @@ typedef struct pthread_mutex_t mutex; } db_driver_t; -/* Query type for statistics */ - -typedef enum { - DB_STAT_OTHER, - DB_STAT_READ, - DB_STAT_WRITE, - DB_STAT_TRX, - DB_STAT_ERROR, - DB_STAT_RECONNECT, - DB_STAT_MAX -} db_stat_type_t; - /* Row value definition */ typedef struct { @@ -205,7 +192,7 @@ typedef struct db_row typedef struct db_result { - db_stat_type_t stat_type; /* Statistical counter type */ + sb_counter_type_t counter; /* Statistical counter type */ uint32_t nrows; /* Number of affected rows */ uint32_t nfields; /* Number of fields */ struct db_stmt *statement; /* Pointer to prepared statement (if used) */ @@ -267,20 +254,11 @@ typedef struct db_stmt db_bind_t *bound_res; /* Array of bound results for emulated PS */ db_bind_t *bound_res_len; /* Length of the bound_res array */ char emulated; /* Should this statement be emulated? */ - db_stat_type_t stat_type; /* Query type */ + sb_counter_type_t counter; /* Query type */ void *ptr; /* Pointer to driver-specific data structure */ } db_stmt_t; -/* - sizeof(db_stats_t) must be multiple of CK_MD_CACHELINE to avoid cache - line sharing. -*/ -typedef uint64_t -db_stats_t[SB_ALIGN(DB_STAT_MAX * sizeof(uint64_t), CK_MD_CACHELINE) / - sizeof(uint64_t)]; - extern db_globals_t db_globals; -extern db_stats_t *thread_stats; /* Database abstraction layer calls */ @@ -332,17 +310,7 @@ int db_bulk_insert_next(db_conn_t *, const char *, size_t); int db_bulk_insert_done(db_conn_t *); /* Print database-specific test stats */ -void db_print_stats(sb_stat_t type); - -/* Initialize per-thread stats */ -int db_thread_stat_init(void); - -/* Increment a given stat counter for a connection */ -static inline void db_thread_stat_inc(int id, db_stat_type_t type) -{ - ck_pr_inc_64(&thread_stats[id][type]); -} - +void db_print_stats(sb_report_t type); /* DB drivers registrars */ diff --git a/src/drivers/mysql/drv_mysql.c b/src/drivers/mysql/drv_mysql.c index cc81763..a689d8d 100644 --- a/src/drivers/mysql/drv_mysql.c +++ b/src/drivers/mysql/drv_mysql.c @@ -529,8 +529,8 @@ int mysql_drv_prepare(db_stmt_t *stmt, const char *query, size_t len) } stmt->query = strdup(query); - stmt->stat_type = (mysql_stmt_field_count(mystmt) > 0) ? - DB_STAT_READ : DB_STAT_WRITE; + stmt->counter = (mysql_stmt_field_count(mystmt) > 0) ? + SB_CNT_READ : SB_CNT_WRITE; return 0; } @@ -697,7 +697,7 @@ static int mysql_drv_reconnect(db_conn_t *sb_con) static db_error_t check_error(db_conn_t *sb_con, const char *func, - const char *query, db_stat_type_t *type) + const char *query, sb_counter_type_t *counter) { sb_list_item_t *pos; unsigned int tmp; @@ -737,7 +737,7 @@ static db_error_t check_error(db_conn_t *sb_con, const char *func, case CR_TCP_CONNECTION: case CR_SERVER_LOST_EXTENDED: - *type = DB_STAT_RECONNECT; + *counter = SB_CNT_RECONNECT; return mysql_drv_reconnect(sb_con); @@ -746,7 +746,7 @@ static db_error_t check_error(db_conn_t *sb_con, const char *func, break; } - *type = DB_STAT_ERROR; + *counter = SB_CNT_ERROR; return DB_ERROR_IGNORABLE; } @@ -759,7 +759,7 @@ static db_error_t check_error(db_conn_t *sb_con, const char *func, log_text(LOG_FATAL, "%s returned error %u (%s)", func, error, sb_con->sql_errmsg); - *type = DB_STAT_ERROR; + *counter = SB_CNT_ERROR; return DB_ERROR_FATAL; } @@ -793,15 +793,15 @@ db_error_t mysql_drv_execute(db_stmt_t *stmt, db_result_t *rs) if (err) return check_error(con, "mysql_stmt_execute()", stmt->query, - &rs->stat_type); + &rs->counter); - if (stmt->stat_type != DB_STAT_READ) + if (stmt->counter != SB_CNT_READ) { rs->nrows = (uint32_t) mysql_stmt_affected_rows(stmt->ptr); DEBUG("mysql_stmt_affected_rows(%p) = %u", stmt->ptr, (unsigned) rs->nrows); - rs->stat_type = (rs->nrows > 0) ? DB_STAT_WRITE : DB_STAT_OTHER; + rs->counter = (rs->nrows > 0) ? SB_CNT_WRITE : SB_CNT_OTHER; return DB_ERROR_NONE; } @@ -811,10 +811,10 @@ db_error_t mysql_drv_execute(db_stmt_t *stmt, db_result_t *rs) if (err) { return check_error(con, "mysql_stmt_store_result()", NULL, - &rs->stat_type); + &rs->counter); } - rs->stat_type = stmt->stat_type; + rs->counter = stmt->counter; rs->nrows = (uint32_t) mysql_stmt_num_rows(stmt->ptr); DEBUG("mysql_stmt_num_rows(%p) = %u", rs->statement->ptr, (unsigned) (rs->nrows)); @@ -885,7 +885,7 @@ db_error_t mysql_drv_query(db_conn_t *sb_conn, const char *query, size_t len, DEBUG("mysql_real_query(%p, \"%s\", %zd) = %d", con, query, len, err); if (SB_UNLIKELY(err != 0)) - return check_error(sb_conn, "mysql_drv_query()", query, &rs->stat_type); + return check_error(sb_conn, "mysql_drv_query()", query, &rs->counter); /* Store results and get query type */ MYSQL_RES *res = mysql_store_result(con); @@ -899,19 +899,19 @@ db_error_t mysql_drv_query(db_conn_t *sb_conn, const char *query, size_t len, uint32_t nrows = (uint32_t) mysql_affected_rows(con); if (nrows > 0) { - rs->stat_type = DB_STAT_WRITE; + rs->counter = SB_CNT_WRITE; rs->nrows = nrows; } else - rs->stat_type = DB_STAT_OTHER; + rs->counter = SB_CNT_OTHER; return DB_ERROR_NONE; } - return check_error(sb_conn, "mysql_store_result()", NULL, &rs->stat_type); + return check_error(sb_conn, "mysql_store_result()", NULL, &rs->counter); } - rs->stat_type = DB_STAT_READ; + rs->counter = SB_CNT_READ; rs->ptr = (void *)res; rs->nrows = mysql_num_rows(res); diff --git a/src/drivers/pgsql/drv_pgsql.c b/src/drivers/pgsql/drv_pgsql.c index 389ae58..7fcd2cc 100644 --- a/src/drivers/pgsql/drv_pgsql.c +++ b/src/drivers/pgsql/drv_pgsql.c @@ -487,7 +487,7 @@ static db_error_t pgsql_check_status(db_conn_t *con, PGresult *pgres, case PGRES_TUPLES_OK: rs->nrows = PQntuples(pgres); rs->nfields = PQnfields(pgres); - rs->stat_type = DB_STAT_READ; + rs->counter = SB_CNT_READ; rc = DB_ERROR_NONE; @@ -495,14 +495,14 @@ static db_error_t pgsql_check_status(db_conn_t *con, PGresult *pgres, case PGRES_COMMAND_OK: rs->nrows = strtoul(PQcmdTuples(pgres), NULL, 10);; - rs->stat_type = (rs->nrows > 0) ? DB_STAT_WRITE : DB_STAT_OTHER; + rs->counter = (rs->nrows > 0) ? SB_CNT_WRITE : SB_CNT_OTHER; rc = DB_ERROR_NONE; break; case PGRES_FATAL_ERROR: rs->nrows = 0; - rs->stat_type = DB_STAT_ERROR; + rs->counter = SB_CNT_ERROR; con->sql_state = PQresultErrorField(pgres, PG_DIAG_SQLSTATE); con->sql_errmsg = PQresultErrorField(pgres, PG_DIAG_MESSAGE_PRIMARY); @@ -528,7 +528,7 @@ static db_error_t pgsql_check_status(db_conn_t *con, PGresult *pgres, default: rs->nrows = 0; - rs->stat_type = DB_STAT_ERROR; + rs->counter = SB_CNT_ERROR; rc = DB_ERROR_FATAL; } diff --git a/src/lua/internal/sysbench.sql.lua b/src/lua/internal/sysbench.sql.lua index f385b84..4fbe374 100644 --- a/src/lua/internal/sysbench.sql.lua +++ b/src/lua/internal/sysbench.sql.lua @@ -60,14 +60,14 @@ typedef struct typedef enum { - DB_STAT_OTHER, - DB_STAT_READ, - DB_STAT_WRITE, - DB_STAT_TRX, - DB_STAT_ERROR, - DB_STAT_RECONNECT, - DB_STAT_MAX -} sql_stat_type; + SB_CNT_OTHER, + SB_CNT_READ, + SB_CNT_WRITE, + SB_CNT_TRX, + SB_CNT_ERROR, + SB_CNT_RECONNECT, + SB_CNT_MAX +} sb_counter_type; typedef struct { @@ -91,7 +91,7 @@ typedef struct typedef struct { - sql_stat_type stat_type; /* Statistical counter type */ + sb_counter_type counter; /* Statistical counter type */ uint32_t nrows; /* Number of affected rows */ uint32_t nfields; /* Number of fields */ sql_statement *statement; /* Pointer to prepared statement (if used) */ diff --git a/src/sb_counter.c b/src/sb_counter.c new file mode 100644 index 0000000..e092ded --- /dev/null +++ b/src/sb_counter.c @@ -0,0 +1,85 @@ +/* + Copyright (C) 2017 Alexey Kopytov + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include "sb_counter.h" +#include "sysbench.h" +#include "sb_util.h" +#include "ck_pr.h" + +static sb_counters_t last_intermediate_counters; +static sb_counters_t last_cumulative_counters; + +/* Initialize per-thread stats */ + +int sb_counters_init(void) +{ + SB_COMPILE_TIME_ASSERT(sizeof(sb_counters_t) % CK_MD_CACHELINE == 0); + + sb_counters = sb_memalign(sb_globals.num_threads * sizeof(sb_counters_t), + CK_MD_CACHELINE); + + memset(sb_counters, 0, sb_globals.num_threads * sizeof(sb_counters_t)); + + return sb_counters == NULL; +} + +void sb_counters_done(void) +{ + if (sb_counters != NULL) + { + free(sb_counters); + sb_counters = NULL; + } +} + +static void sb_counters_merge(sb_counters_t dst) +{ + for (size_t t = 0; t < SB_CNT_MAX; t++) + for (size_t i = 0; i < sb_globals.num_threads; i++) + dst[t] += sb_counter_val(i, t); +} + +static void sb_counters_checkpoint(sb_counters_t dst, sb_counters_t cp) +{ + for (size_t i = 0; i < SB_CNT_MAX; i++) + { + uint64_t tmp = cp[i]; + cp[i] = dst[i]; + dst[i] = dst[i] - tmp; + } +} + +void sb_counters_agg_intermediate(sb_counters_t val) +{ + memset(val, 0, sizeof(sb_counters_t)); + + sb_counters_merge(val); + sb_counters_checkpoint(val, last_intermediate_counters); +} + +void sb_counters_agg_cumulative(sb_counters_t val) +{ + memset(val, 0, sizeof(sb_counters_t)); + + sb_counters_merge(val); + sb_counters_checkpoint(val, last_cumulative_counters); +} diff --git a/src/sb_counter.h b/src/sb_counter.h new file mode 100644 index 0000000..a106a12 --- /dev/null +++ b/src/sb_counter.h @@ -0,0 +1,78 @@ +/* + Copyright (C) 2017 Alexey Kopytov + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#ifndef SB_COUNTER_H +#define SB_COUNTER_H + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#ifdef STDC_HEADERS +# include +#endif + +#include "sb_util.h" +#include "ck_pr.h" + +/* Statistic counter types */ + +typedef enum { + SB_CNT_OTHER, /* unknown type of queries */ + SB_CNT_READ, /* reads */ + SB_CNT_WRITE, /* writes */ + SB_CNT_EVENT, /* events */ + SB_CNT_ERROR, /* errors */ + SB_CNT_RECONNECT, /* reconnects */ + SB_CNT_MAX +} sb_counter_type_t; + +/* + sizeof(sb_counter_t) must be a multiple of CK_MD_CACHELINE to avoid cache + line sharing. +*/ +typedef uint64_t +sb_counters_t[SB_ALIGN(SB_CNT_MAX * sizeof(uint64_t), CK_MD_CACHELINE) / + sizeof(uint64_t)]; + +sb_counters_t *sb_counters CK_CC_CACHELINE; + +int sb_counters_init(void); + +void sb_counters_done(void); + +/* Return the current value for a given counter */ +static inline uint64_t sb_counter_val(int thread_id, sb_counter_type_t type) +{ + return ck_pr_load_64(&sb_counters[thread_id][type]); +} + +/* Increment a given stat counter for a given thread */ +static inline void sb_counter_inc(int thread_id, sb_counter_type_t type) +{ + ck_pr_store_64(&sb_counters[thread_id][type], + sb_counter_val(thread_id, type)+1); +} + +/* Return aggregate counter values since the last intermediate report */ +void sb_counters_agg_intermediate(sb_counters_t val); + +/* Return aggregate counter values since the last cumulative report */ +void sb_counters_agg_cumulative(sb_counters_t val); + +#endif diff --git a/src/sb_lua.c b/src/sb_lua.c index 9e03ddf..9b68715 100644 --- a/src/sb_lua.c +++ b/src/sb_lua.c @@ -133,7 +133,7 @@ static int sb_lua_op_done(void); static int sb_lua_op_thread_init(int); static int sb_lua_op_thread_run(int); static int sb_lua_op_thread_done(int); -static void sb_lua_op_print_stats(sb_stat_t type); +static void sb_lua_op_print_stats(sb_report_t type); static sb_operations_t lua_ops = { .init = sb_lua_op_init, @@ -449,7 +449,7 @@ int sb_lua_op_thread_done(int thread_id) return rc; } -void sb_lua_op_print_stats(sb_stat_t type) +void sb_lua_op_print_stats(sb_report_t type) { db_print_stats(type); } diff --git a/src/sb_util.h b/src/sb_util.h index 9030349..238b1f0 100644 --- a/src/sb_util.h +++ b/src/sb_util.h @@ -27,6 +27,10 @@ # include "config.h" #endif +#ifdef HAVE_UNISTD_H +# include +#endif + #include "ck_md.h" #include "ck_cc.h" diff --git a/src/sysbench.c b/src/sysbench.c index 85faf28..faf75e9 100644 --- a/src/sysbench.c +++ b/src/sysbench.c @@ -186,7 +186,7 @@ static void sigalrm_forced_shutdown_handler(int sig) "The --max-time limit has expired, forcing shutdown..."); if (current_test && current_test->ops.print_stats) - current_test->ops.print_stats(SB_STAT_CUMULATIVE); + current_test->ops.print_stats(SB_REPORT_CUMULATIVE); log_done(); @@ -548,7 +548,7 @@ void sb_event_stop(int thread_id) if (sb_globals.percentile > 0) sb_histogram_update(&sb_latency_histogram, NS2MS(value)); - db_thread_stat_inc(thread_id, DB_STAT_TRX); + sb_counter_inc(thread_id, SB_CNT_EVENT); if (sb_globals.tx_rate > 0) { @@ -746,7 +746,7 @@ static void *report_thread_proc(void *arg) */ pthread_mutex_lock(&report_interval_mutex); if (sb_globals.report_interval > 0) - current_test->ops.print_stats(SB_STAT_INTERMEDIATE); + current_test->ops.print_stats(SB_REPORT_INTERMEDIATE); pthread_mutex_unlock(&report_interval_mutex); curr_ns = sb_timer_value(&sb_exec_timer); @@ -800,7 +800,7 @@ static void *checkpoints_thread_proc(void *arg) SB_THREAD_MUTEX_LOCK(); log_timestamp(LOG_NOTICE, NS2SEC(sb_timer_value(&sb_exec_timer)), "Checkpoint report:"); - current_test->ops.print_stats(SB_STAT_CUMULATIVE); + current_test->ops.print_stats(SB_REPORT_CUMULATIVE); print_global_stats(); SB_THREAD_MUTEX_UNLOCK(); } @@ -973,7 +973,7 @@ static int run_test(sb_test_t *test) /* print test-specific stats */ if (test->ops.print_stats != NULL && !sb_globals.error) - test->ops.print_stats(SB_STAT_CUMULATIVE); + test->ops.print_stats(SB_REPORT_CUMULATIVE); pthread_mutex_destroy(&sb_globals.exec_mutex); @@ -1173,7 +1173,7 @@ int main(int argc, char *argv[]) } /* Initialize global variables and logger */ - if (init() || log_init() || db_thread_stat_init()) + if (init() || log_init() || sb_counters_init()) return EXIT_FAILURE; print_header(); @@ -1288,6 +1288,8 @@ int main(int argc, char *argv[]) db_done(); + sb_counters_done(); + log_done(); sb_options_done(); diff --git a/src/sysbench.h b/src/sysbench.h index 6eac6cb..dc5de59 100644 --- a/src/sysbench.h +++ b/src/sysbench.h @@ -86,9 +86,9 @@ typedef struct typedef enum { - SB_STAT_INTERMEDIATE, - SB_STAT_CUMULATIVE -} sb_stat_t; + SB_REPORT_INTERMEDIATE, + SB_REPORT_CUMULATIVE +} sb_report_t; /* Commands */ @@ -104,7 +104,7 @@ typedef int sb_op_thread_run(int); typedef void sb_op_print_mode(void); typedef sb_event_t sb_op_next_event(int); typedef int sb_op_execute_event(sb_event_t *, int); -typedef void sb_op_print_stats(sb_stat_t); +typedef void sb_op_print_stats(sb_report_t); typedef int sb_op_thread_done(int); typedef int sb_op_cleanup(void); typedef int sb_op_done(void); diff --git a/src/tests/fileio/sb_fileio.c b/src/tests/fileio/sb_fileio.c index d78ac05..9e0f628 100644 --- a/src/tests/fileio/sb_fileio.c +++ b/src/tests/fileio/sb_fileio.c @@ -252,7 +252,7 @@ static int file_execute_event(sb_event_t *, int); static int file_thread_done(int); #endif static int file_done(void); -static void file_print_stats(sb_stat_t); +static void file_print_stats(sb_report_t); static sb_test_t fileio_test = { @@ -836,7 +836,7 @@ void file_print_mode(void) /* Print test statistics */ -void file_print_stats(sb_stat_t type) +void file_print_stats(sb_report_t type) { double seconds; unsigned long long diff_read; @@ -844,7 +844,7 @@ void file_print_stats(sb_stat_t type) unsigned long long diff_other_ops; switch (type) { - case SB_STAT_INTERMEDIATE: + case SB_REPORT_INTERMEDIATE: { SB_THREAD_MUTEX_LOCK(); @@ -876,7 +876,7 @@ void file_print_stats(sb_stat_t type) break; } - case SB_STAT_CUMULATIVE: + case SB_REPORT_CUMULATIVE: seconds = NS2SEC(sb_timer_checkpoint(&sb_checkpoint_timer1)); log_text(LOG_NOTICE, "\n" diff --git a/src/tests/memory/sb_memory.c b/src/tests/memory/sb_memory.c index c223724..aba9b7f 100644 --- a/src/tests/memory/sb_memory.c +++ b/src/tests/memory/sb_memory.c @@ -58,7 +58,7 @@ static int memory_init(void); static void memory_print_mode(void); static sb_event_t memory_next_event(int); static int memory_execute_event(sb_event_t *, int); -static void memory_print_stats(sb_stat_t type); +static void memory_print_stats(sb_report_t type); static sb_test_t memory_test = { @@ -340,13 +340,13 @@ void memory_print_mode(void) } -void memory_print_stats(sb_stat_t type) +void memory_print_stats(sb_report_t type) { double seconds; const double megabyte = 1024.0 * 1024.0; switch (type) { - case SB_STAT_INTERMEDIATE: + case SB_REPORT_INTERMEDIATE: SB_THREAD_MUTEX_LOCK(); seconds = NS2SEC(sb_timer_checkpoint(&sb_intermediate_timer)); @@ -358,7 +358,7 @@ void memory_print_stats(sb_stat_t type) break; - case SB_STAT_CUMULATIVE: + case SB_REPORT_CUMULATIVE: seconds = NS2SEC(sb_timer_checkpoint(&sb_checkpoint_timer1)); log_text(LOG_NOTICE, "Operations performed: %d (%8.2f ops/sec)\n",