Implement error propagation.

SQL query errors can now be propagated to Lua scripts. This is a
prerequisite for custom error hooks.
This commit is contained in:
Alexey Kopytov
2017-01-20 02:43:08 +03:00
parent d767dba581
commit dbb7bbfcd1
14 changed files with 469 additions and 296 deletions

View File

@ -400,7 +400,7 @@ int db_connection_close(db_conn_t *con)
if (con->state == DB_CONN_INVALID)
{
log_text(LOG_WARNING, "attempt to close an already closed connection");
log_text(LOG_ALERT, "attempt to close an already closed connection");
return 0;
}
else if(con->state == DB_CONN_RESULT_SET)
@ -500,6 +500,8 @@ db_result_t *db_execute(db_stmt_t *stmt)
db_result_t *rs = &con->rs;
int rc;
con->drv_errno = 0;
if (con->state == DB_CONN_INVALID)
{
log_text(LOG_ALERT, "attempt to use an already closed connection");
@ -513,7 +515,7 @@ db_result_t *db_execute(db_stmt_t *stmt)
rs->statement = stmt;
con->db_errno = con->driver->ops.execute(stmt, rs);
con->sql_errno = con->driver->ops.execute(stmt, rs);
db_thread_stat_inc(con->thread_id, rs->stat_type);
@ -574,24 +576,38 @@ db_result_t *db_query(db_conn_t *con, const char *query, size_t len)
db_result_t *rs = &con->rs;
int rc;
con->drv_errno = 0;
if (con->state == DB_CONN_INVALID)
{
log_text(LOG_ALERT, "attempt to use an already closed connection");
con->sql_errno = DB_ERROR_FATAL;
return NULL;
}
else if (con->state == DB_CONN_RESULT_SET &&
(rc = db_free_results_int(con)) != 0)
{
con->sql_errno = DB_ERROR_FATAL;
return NULL;
}
con->db_errno = con->driver->ops.query(con, query, len, rs);
con->sql_errno = con->driver->ops.query(con, query, len, rs);
db_thread_stat_inc(con->thread_id, rs->stat_type);
con->state = DB_CONN_RESULT_SET;
if (SB_LIKELY(con->sql_errno == DB_ERROR_NONE))
{
if (rs->stat_type == DB_STAT_READ)
{
con->state = DB_CONN_RESULT_SET;
return rs;
}
con->state = DB_CONN_READY;
return rs;
return NULL;
}
return NULL;
}
@ -627,12 +643,12 @@ int db_free_results(db_result_t *rs)
if (con->state == DB_CONN_INVALID)
{
log_text(LOG_ALERT, "attempt to use an already closed connection");
return 0;
return 1;
}
else if (con->state != DB_CONN_RESULT_SET)
{
log_text(LOG_ALERT, "attempt to free an invalid result set");
return 0;
return 1;
}
return db_free_results_int(con);
@ -927,7 +943,8 @@ static int db_bulk_do_insert(db_conn_t *con, int is_last)
if (!con->bulk_cnt)
return 0;
if (db_query(con, con->bulk_buffer, con->bulk_ptr) == NULL)
if (db_query(con, con->bulk_buffer, con->bulk_ptr) == NULL &&
con->sql_errno != DB_ERROR_NONE)
return 1;
@ -937,7 +954,8 @@ static int db_bulk_do_insert(db_conn_t *con, int is_last)
if (is_last || con->bulk_commit_cnt >= con->bulk_commit_max)
{
if (db_query(con, "COMMIT", 6) == NULL)
if (db_query(con, "COMMIT", 6) == NULL &&
con->sql_errno != DB_ERROR_NONE)
return 1;
con->bulk_commit_cnt = 0;
}

View File

@ -223,11 +223,12 @@ typedef enum {
typedef struct db_conn
{
db_error_t sql_errno; /* Driver-independent error code */
int drv_errno; /* Driver-specific error code */
db_driver_t *driver; /* DB driver for this connection */
void *ptr; /* Driver-specific data */
db_error_t db_errno; /* Driver-independent error code */
db_conn_state_t state; /* Connection state */
db_result_t rs; /* Result set */
db_conn_state_t state; /* Connection state */
int thread_id; /* Thread this connection belongs to */
unsigned int bulk_cnt; /* Current number of rows in bulk insert buffer */
@ -239,6 +240,7 @@ typedef struct db_conn
unsigned int bulk_commit_max; /* Maximum value of uncommitted rows */
char pad[SB_CACHELINE_PAD(sizeof(void *) * 3 + sizeof(db_error_t) +
sizeof(int) +
sizeof(db_conn_state_t) +
sizeof(db_result_t) +
sizeof(int) * 8)];

View File

@ -291,7 +291,7 @@ int attachsql_drv_prepare(db_stmt_t *stmt, const char *query, size_t len)
{
log_text(LOG_ALERT, "libAttachSQL Prepare Failed: %u:%s", attachsql_error_code(error), attachsql_error_message(error));
attachsql_error_free(error);
return SB_DB_ERROR_FAILED;
return DB_ERROR_FATAL;
}
}
@ -413,11 +413,11 @@ db_error_t attachsql_drv_execute(db_stmt_t *stmt, db_result_t *rs)
{
log_text(LOG_ALERT, "libAttachSQL Execute Failed: %u:%s", attachsql_error_code(error), attachsql_error_message(error));
attachsql_error_free(error);
return SB_DB_ERROR_FAILED;
return DB_ERROR_FATAL;
}
}
return SB_DB_ERROR_NONE;
return DB_ERROR_NONE;
}
@ -452,17 +452,17 @@ db_error_t attachsql_drv_query(db_conn_t *sb_conn, const char *query,
if (rc == 1213 || rc == 1205 || rc == 1020)
{
attachsql_error_free(error);
return SB_DB_ERROR_RESTART_TRANSACTION;
return DB_ERROR_IGNORABLE;
}
log_text(LOG_ALERT, "libAttachSQL Query Failed: %u:%s", attachsql_error_code(error), attachsql_error_message(error));
attachsql_error_free(error);
return SB_DB_ERROR_FAILED;
return DB_ERROR_FATAL;
}
}
//rs->connection->ptr= con;
DEBUG("attachsql_query \"%s\" returned %d", query, aret);
return SB_DB_ERROR_NONE;
return DB_ERROR_NONE;
}
@ -580,7 +580,7 @@ int attachsql_drv_store_results(db_result_t *rs)
}
}
return SB_DB_ERROR_NONE;
return DB_ERROR_NONE;
}

View File

@ -368,7 +368,7 @@ db_error_t drizzle_drv_execute(db_stmt_t *stmt, db_result_t *rs)
if (buf == NULL)
{
log_text(LOG_DEBUG, "ERROR: exiting drizzle_drv_execute(), memory allocation failure");
return SB_DB_ERROR_FAILED;
return DB_ERROR_FATAL;
}
need_realloc = 0;
}
@ -390,15 +390,15 @@ db_error_t drizzle_drv_execute(db_stmt_t *stmt, db_result_t *rs)
}
buf[j] = '\0';
con->db_errno = drizzle_drv_query(con, buf, j, rs);
con->sql_errno = drizzle_drv_query(con, buf, j, rs);
free(buf);
if (con->db_errno != SB_DB_ERROR_NONE)
if (con->sql_errno != DB_ERROR_NONE)
{
log_text(LOG_DEBUG, "ERROR: exiting drizzle_drv_execute(), database error");
return con->db_errno;
return con->sql_errno;
}
return SB_DB_ERROR_NONE;
return DB_ERROR_NONE;
}
@ -432,11 +432,11 @@ db_error_t drizzle_drv_query(db_conn_t *sb_conn, const char *query, size_t len,
*/
if (rc == 1213 || rc == 1205 ||
rc == 1020)
return SB_DB_ERROR_RESTART_TRANSACTION;
return DB_ERROR_IGNORABLE;
log_text(LOG_ALERT, "Drizzle Query Failed: %u:%s",
drizzle_result_error_code(result),
drizzle_result_error(result));
return SB_DB_ERROR_FAILED;
return DB_ERROR_FATAL;
}
else if (ret != DRIZZLE_RETURN_OK)
{
@ -446,14 +446,14 @@ db_error_t drizzle_drv_query(db_conn_t *sb_conn, const char *query, size_t len,
strlen(query), query);
log_text(LOG_ALERT, "Error %d %s", drizzle_con_errno(con),
drizzle_con_error(con));
return SB_DB_ERROR_FAILED;
return DB_ERROR_FATAL;
}
DEBUG("drizzle_query \"%s\" returned %d", query, ret);
if (result == NULL)
{
DEBUG("drizzle_query(%p, \"%s\") == NULL",con,query);
return SB_DB_ERROR_FAILED;
return DB_ERROR_FATAL;
}
@ -461,7 +461,7 @@ db_error_t drizzle_drv_query(db_conn_t *sb_conn, const char *query, size_t len,
rs->nrows= drizzle_result_row_count(result);
DEBUG("drizzle_result_row_count(%p) == %d",result,rs->nrows);
return SB_DB_ERROR_NONE;
return DB_ERROR_NONE;
}
@ -506,7 +506,7 @@ int drizzle_drv_store_results(db_result_t *rs)
if (con == NULL || res == NULL)
return SB_DB_ERROR_FAILED;
return DB_ERROR_FATAL;
if (args.buffer == BUFFER_ALL)
@ -522,7 +522,7 @@ int drizzle_drv_store_results(db_result_t *rs)
log_text(LOG_ALERT, "Error %d %s",
drizzle_con_errno(con),
drizzle_con_error(con));
return SB_DB_ERROR_FAILED;
return DB_ERROR_FATAL;
}
while ((column = drizzle_column_next(res)) != NULL)
column_info(column);
@ -550,7 +550,7 @@ int drizzle_drv_store_results(db_result_t *rs)
log_text(LOG_ALERT, "Error %d %s",
drizzle_con_errno(con),
drizzle_con_error(con));
return SB_DB_ERROR_FAILED;
return DB_ERROR_FATAL;
}
if (column == NULL)
break;
@ -574,7 +574,7 @@ int drizzle_drv_store_results(db_result_t *rs)
log_text(LOG_ALERT, "Error %d %s",
drizzle_con_errno(con),
drizzle_con_error(con));
return SB_DB_ERROR_FAILED;
return DB_ERROR_FATAL;
}
if (row == NULL)
@ -596,7 +596,7 @@ int drizzle_drv_store_results(db_result_t *rs)
log_text(LOG_ALERT, "Error %d %s",
drizzle_con_errno(con),
drizzle_con_error(con));
return SB_DB_ERROR_FAILED;
return DB_ERROR_FATAL;
}
if (row_num == 0)
@ -634,7 +634,7 @@ int drizzle_drv_store_results(db_result_t *rs)
log_text(LOG_ALERT, "Error %d %s",
drizzle_con_errno(con),
drizzle_con_error(con));
return SB_DB_ERROR_FAILED;
return DB_ERROR_FATAL;
}
if (args.buffer == BUFFER_FIELD)
@ -646,7 +646,7 @@ int drizzle_drv_store_results(db_result_t *rs)
} /* while (1) */
}
return SB_DB_ERROR_NONE;
return DB_ERROR_NONE;
}

View File

@ -702,16 +702,17 @@ static db_error_t check_error(db_conn_t *sb_con, const char *func,
{
sb_list_item_t *pos;
unsigned int tmp;
unsigned int error;
db_mysql_conn_t *db_mysql_con = (db_mysql_conn_t *) sb_con->ptr;
MYSQL *con = db_mysql_con->mysql;
error = mysql_errno(con);
DEBUG("mysql_errno(%p) = %u", con, error);
const unsigned int error = mysql_errno(con);
DEBUG("mysql_errno(%p) = %u", con, sb_con->drv_errno);
sb_con->drv_errno = (int) error;
/*
Check if the error code is specified in --mysql-ignore-errors, and return
SB_DB_ERROR_RESTART_TRANSACTION if so, or SB_DB_ERROR_FAILED otherwise
DB_ERROR_IGNORABLE if so, or DB_ERROR_FATAL otherwise
*/
SB_LIST_FOR_EACH(pos, args.ignored_errors)
{
@ -747,10 +748,10 @@ static db_error_t check_error(db_conn_t *sb_con, const char *func,
}
if (query)
log_text(LOG_ALERT, "%s returned error %u (%s) for query '%s'",
log_text(LOG_FATAL, "%s returned error %u (%s) for query '%s'",
func, error, mysql_error(con), query);
else
log_text(LOG_ALERT, "%s returned error %u (%s)",
log_text(LOG_FATAL, "%s returned error %u (%s)",
func, error, mysql_error(con));
*type = DB_STAT_ERROR;
@ -1000,8 +1001,12 @@ int mysql_drv_close(db_stmt_t *stmt)
if (stmt->ptr == NULL)
return 1;
DEBUG("mysql_stmt_close(%p)", stmt->ptr);
return mysql_stmt_close(stmt->ptr);
int rc = mysql_stmt_close(stmt->ptr);
DEBUG("mysql_stmt_close(%p) = %d", stmt->ptr, rc);
stmt->ptr = NULL;
return rc;
}

View File

@ -603,29 +603,29 @@ db_error_t ora_drv_execute(db_stmt_t *stmt, db_result_t *rs)
(void)rs; /* unused */
if (db_con == NULL)
return SB_DB_ERROR_FAILED;
return DB_ERROR_FATAL;
ora_con = db_con->ptr;
if (ora_con == NULL)
return SB_DB_ERROR_FAILED;
return DB_ERROR_FATAL;
if (!stmt->emulated)
{
if (stmt->ptr == NULL)
return SB_DB_ERROR_FAILED;
return DB_ERROR_FATAL;
if (ora_stmt->type == STMT_TYPE_BEGIN)
{
rc = OCITransStart(ora_con->svchp, ora_con->errhp, 3600, OCI_TRANS_NEW);
CHECKERR("OCITransStart");
return SB_DB_ERROR_NONE;
return DB_ERROR_NONE;
}
else if (ora_stmt->type == STMT_TYPE_COMMIT)
{
rc = OCITransCommit(ora_con->svchp, ora_con->errhp, OCI_DEFAULT);
CHECKERR("OCITransCommit");
return SB_DB_ERROR_NONE;
return DB_ERROR_NONE;
}
else if (ora_stmt->type == STMT_TYPE_SELECT)
iters = 0;
@ -636,7 +636,7 @@ db_error_t ora_drv_execute(db_stmt_t *stmt, db_result_t *rs)
NULL, NULL, OCI_DEFAULT);
CHECKERR("OCIStmtExecute");
return SB_DB_ERROR_NONE;
return DB_ERROR_NONE;
}
/* Build the actual query string from parameters list */
@ -651,7 +651,7 @@ db_error_t ora_drv_execute(db_stmt_t *stmt, db_result_t *rs)
buf = realloc(buf, buflen);
if (buf == NULL)
{
return SB_DB_ERROR_FAILED;
return DB_ERROR_FATAL;
}
need_realloc = 0;
}
@ -673,15 +673,15 @@ db_error_t ora_drv_execute(db_stmt_t *stmt, db_result_t *rs)
}
buf[j] = '\0';
db_con->db_errno = ora_drv_query(db_con, buf, j, rs);
db_con->sql_errno = ora_drv_query(db_con, buf, j, rs);
free(buf);
return SB_DB_ERROR_NONE;
return DB_ERROR_NONE;
error:
log_text(LOG_FATAL, "failed query was: '%s'", stmt->query);
return SB_DB_ERROR_FAILED;
return DB_ERROR_FATAL;
}
@ -707,14 +707,14 @@ db_error_t ora_drv_query(db_conn_t *sb_conn, const char *query, size_t len,
rc = OCITransStart(ora_con->svchp, ora_con->errhp, 3600, OCI_TRANS_NEW);
CHECKERR("OCITransStart");
return SB_DB_ERROR_NONE;
return DB_ERROR_NONE;
}
else if (type == STMT_TYPE_COMMIT)
{
rc = OCITransCommit(ora_con->svchp, ora_con->errhp, OCI_DEFAULT);
CHECKERR("OCITransCommit");
return SB_DB_ERROR_NONE;
return DB_ERROR_NONE;
}
else if (type == STMT_TYPE_SELECT)
iters = 0;
@ -736,14 +736,14 @@ db_error_t ora_drv_query(db_conn_t *sb_conn, const char *query, size_t len,
OCIHandleFree(stmt, OCI_HTYPE_STMT);
return SB_DB_ERROR_NONE;
return DB_ERROR_NONE;
error:
log_text(LOG_FATAL, "failed query was: '%s'", query);
if (stmt != NULL)
OCIHandleFree(stmt, OCI_HTYPE_STMT);
return SB_DB_ERROR_FAILED;
return DB_ERROR_FATAL;
}

View File

@ -14,19 +14,34 @@
-- along with this program; if not, write to the Free Software
-- Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-- ----------------------------------------------------------------------
-- Pseudo-random number generation API
-- ----------------------------------------------------------------------
ffi = require("ffi")
-- ----------------------------------------------------------------------
-- Main event loop. This is a Lua version of sysbench.c:thread_run()
-- ----------------------------------------------------------------------
function thread_run(thread_id)
local success, ret
while sysbench.more_events() do
sysbench.event_start()
event(thread_id)
repeat
local success, ret = pcall(event, thread_id)
if not success then
if (type(ret) == "table" and
ret.errcode == sysbench.error.RESTART_EVENT) then
else
error(ret, 2) -- propagate unknown errors
end
end
until success
-- Stop the benchmark if event() returns a non-nil value
if ret then
break
end
sysbench.event_stop()
end
end

View File

@ -69,7 +69,14 @@ typedef enum
DB_STAT_MAX
} sql_stat_type;
typedef struct db_conn sql_connection;
typedef struct
{
sql_error_t sql_errno; /* Driver-independent error code */
int drv_errno; /* Driver-specific error code */
const char opaque[?];
} sql_connection;
typedef struct db_stmt sql_statement;
/* Result set definition */
@ -143,7 +150,7 @@ local sql_value = ffi.typeof('sql_value');
local sql_row = ffi.typeof('sql_row');
sysbench.sql.type =
{
{
NONE = ffi.C.SQL_TYPE_NONE,
TINYINT = ffi.C.SQL_TYPE_TINYINT,
SMALLINT = ffi.C.SQL_TYPE_SMALLINT,
@ -159,71 +166,115 @@ sysbench.sql.type =
VARCHAR = ffi.C.SQL_TYPE_VARCHAR
}
local function check_type(vtype, var, func)
if var == nil or not ffi.istype(vtype, var) then
error(string.format("bad argument '%s' to %s() where a '%s' was expected",
var, func, vtype),
3)
end
end
-- Initialize a given SQL driver and return a handle to it to create
-- connections. A nil driver name (i.e. no function argument) initializes the
-- default driver, i.e. the one specified with --db-driver on the command line.
function sysbench.sql.driver(driver_name)
local drv = ffi.C.db_create(driver_name)
if (drv == nil) then
error("failed to initialize the DB driver")
error("failed to initialize the DB driver", 2)
end
return ffi.gc(drv, ffi.C.db_destroy)
end
function sysbench.sql.connect(driver)
check_type(sql_driver, driver, 'sysbench.sql.connect')
local con = ffi.C.db_connection_create(driver)
-- sql_driver methods
local driver_methods = {}
function driver_methods.connect(self)
local con = ffi.C.db_connection_create(self)
if con == nil then
error('connection creation failed')
error("connection creation failed", 2)
end
return ffi.gc(con, ffi.C.db_connection_free)
end
function sysbench.sql.disconnect(con)
check_type(sql_connection, con, 'sysbench.sql.disconnect')
return ffi.C.db_connection_close(con) == 0
function driver_methods.name(self)
return ffi.string(self.sname)
end
function sysbench.sql.query(con, query)
check_type(sql_connection, con, 'sysbench.sql.query')
return ffi.C.db_query(con, query, #query)
-- sql_driver metatable
local driver_mt = {
__index = driver_methods,
__gc = ffi.C.db_destroy,
__tostring = function() return '<sql_driver>' end,
}
ffi.metatype("sql_driver", driver_mt)
-- sql_connection methods
local connection_methods = {}
function connection_methods.disconnect(self)
return assert(ffi.C.db_connection_close(self) == 0)
end
function sysbench.sql.bulk_insert_init(con, query)
check_type(sql_connection, con, 'sysbench.sql.bulk_insert_init')
return ffi.C.db_bulk_insert_init(con, query, #query)
function connection_methods.check_error(self, rs)
if rs ~= nil or self.sql_errno == sysbench.sql.error.NONE then
return rs
end
if self.sql_errno == sysbench.sql.error.IGNORABLE then
-- Throw an error containing the SQL error number provided by the SQL
-- driver. It can be caught by the user script to do some extra steps to
-- restart a transaction (e.g. reprepare statements after a
-- reconnect). Otherwise it will be caught by thread_run() in
-- sysbench.lua, in which case the event will be restarted.
error({ errcode = sysbench.error.RESTART_EVENT,
sql_errno = self.sql_errno,
drv_errno = self.drv_errno})
end
error(string.format("Fatal SQL error, drv_errno = %d", self.drv_errno))
end
function sysbench.sql.bulk_insert_next(con, val)
check_type(sql_connection, con, 'sysbench.sql.bulk_insert_next')
return ffi.C.db_bulk_insert_next(con, val, #val)
function connection_methods.query(self, query)
local rs = ffi.C.db_query(self, query, #query)
return self:check_error(rs)
end
function sysbench.sql.bulk_insert_done(con)
check_type(sql_connection, con, 'sysbench.sql.bulk_insert_done')
return ffi.C.db_bulk_insert_done(con)
function connection_methods.bulk_insert_init(self, query)
return ffi.C.db_bulk_insert_init(self, query, #query)
end
function sysbench.sql.prepare(con, query)
check_type(sql_connection, con, 'sysbench.sql.prepare')
local stmt = ffi.C.db_prepare(con, query, #query)
function connection_methods.bulk_insert_next(self, val)
return ffi.C.db_bulk_insert_next(self, val, #val)
end
function connection_methods.bulk_insert_done(self)
return ffi.C.db_bulk_insert_done(self)
end
function connection_methods.prepare(self, query)
local stmt = ffi.C.db_prepare(self, query, #query)
return stmt
end
function sysbench.sql.bind_create(stmt, btype, maxlen)
-- A convenience wrapper around sql_connection:query() and
-- sql_result:fetch_row(). Executes the specified query and returns the first
-- row from the result set, if available, or nil otherwise
function connection_methods.query_row(self, query)
local rs = self:query(query)
if rs == nil then
return nil
end
return unpack(rs:fetch_row(), 1, rs.nfields)
end
-- sql_connection metatable
local connection_mt = {
__index = connection_methods,
__tostring = function() return '<sql_connection>' end,
__gc = ffi.C.db_connection_free,
}
ffi.metatype("sql_connection", connection_mt)
-- sql_statement methods
local statement_methods = {}
function statement_methods.bind_create(self, btype, maxlen)
local sql_type = sysbench.sql.type
local buf, buflen, datalen, isnull
check_type(sql_statement, stmt, 'sysbench.sql.bind_create')
if btype == sql_type.TINYINT or
btype == sql_type.SMALLINT or
btype == sql_type.INT or
@ -245,7 +296,7 @@ function sysbench.sql.bind_create(stmt, btype, maxlen)
buf = ffi.new('char[?]', maxlen)
buflen = maxlen
else
error("Unsupported argument type: " .. btype)
error("Unsupported argument type: " .. btype, 2)
end
datalen = ffi.new('unsigned long[1]')
@ -254,82 +305,91 @@ function sysbench.sql.bind_create(stmt, btype, maxlen)
return ffi.new(sql_bind, btype, buf, datalen, buflen, isnull)
end
function sysbench.sql.bind_set(bind, value)
local sql_type = sysbench.sql.type
local btype = bind.type
function statement_methods.bind_param(self, ...)
local len = select('#', ...)
if len < 1 then return nil end
check_type(sql_bind, bind, 'sysbench.sql.bind_set')
return ffi.C.db_bind_param(self,
ffi.new("sql_bind[?]", len, {...}),
len)
end
function statement_methods.execute(self)
return ffi.C.db_execute(self)
end
function statement_methods.close(self)
return ffi.C.db_close(self)
end
-- sql_statement metatable
local statement_mt = {
__index = statement_methods,
__tostring = function() return '<sql_statement>' end,
__gc = ffi.C.db_close,
}
ffi.metatype("sql_statement", statement_mt)
-- sql_bind methods
local bind_methods = {}
function bind_methods.set(self, value)
local sql_type = sysbench.sql.type
local btype = self.type
if (value == nil) then
bind.is_null[0] = true
self.is_null[0] = true
return
end
bind.is_null[0] = false
self.is_null[0] = false
if btype == sql_type.TINYINT or
btype == sql_type.SMALLINT or
btype == sql_type.INT or
btype == sql_type.BIGINT
then
ffi.copy(bind.buffer, ffi.new('int64_t[1]', value), 8)
ffi.copy(self.buffer, ffi.new('int64_t[1]', value), 8)
elseif btype == sql_type.FLOAT or
btype == sql_type.DOUBLE
then
ffi.copy(bind.buffer, ffi.new('double[1]', value), 8)
ffi.copy(self.buffer, ffi.new('double[1]', value), 8)
elseif btype == sql_type.CHAR or
btype == sql_type.VARCHAR
then
local len = #value
len = bind.max_len < len and bind.max_len or len
ffi.copy(bind.buffer, value, len)
bind.data_len[0] = len
len = self.max_len < len and self.max_len or len
ffi.copy(self.buffer, value, len)
self.data_len[0] = len
else
error("Unsupported argument type: " .. btype)
error("Unsupported argument type: " .. btype, 2)
end
end
function sysbench.sql.bind_destroy(bind)
check_type(sql_bind, bind, 'sysbench.sql.bind_destroy')
end
-- sql_bind metatable
local bind_mt = {
__index = bind_methods,
__tostring = function() return '<sql_bind>' end,
}
ffi.metatype("sql_bind", bind_mt)
function sysbench.sql.bind_param(stmt, ...)
local len = #{...}
local i
check_type(sql_statement, stmt, 'sysbench.sql.bind_param')
return ffi.C.db_bind_param(stmt,
ffi.new("sql_bind[?]", len, {...}),
len)
end
function sysbench.sql.execute(stmt)
check_type(sql_statement, stmt, 'sysbench.sql.execute')
return ffi.C.db_execute(stmt)
end
function sysbench.sql.close(stmt)
check_type(sql_statement, stmt, 'sysbench.sql.close')
return ffi.C.db_close(stmt)
end
-- sql_result methods
local result_methods = {}
-- Returns the next row of values from a result set, or nil if there are no more
-- rows to fetch. Values are returned as an array, i.e. a table with numeric
-- indexes starting from 1. The total number of values (i.e. fields in a result
-- set) can be obtained from sql_result.nfields.
function sysbench.sql.fetch_row(rs)
check_type(sql_result, rs, 'sysbench.sql.fetch_row')
function result_methods.fetch_row(self)
local res = {}
local row = ffi.C.db_fetch_row(rs)
local row = ffi.C.db_fetch_row(self)
if row == nil then
return nil
end
local i
for i = 0, rs.nfields-1 do
for i = 0, self.nfields-1 do
if row.values[i].ptr ~= nil then -- not a NULL value
res[i+1] = ffi.string(row.values[i].ptr, tonumber(row.values[i].len))
end
@ -338,89 +398,20 @@ function sysbench.sql.fetch_row(rs)
return res
end
function sysbench.sql.query_row(con, query)
check_type(sql_connection, con, 'sysbench.sql.query_row')
local rs = con:query(query)
if rs == nil then
return nil
end
return unpack(rs:fetch_row(), 1, rs.nfields)
function result_methods.free(self)
return assert(ffi.C.db_free_results(self) == 0, "db_free_results() failed")
end
function sysbench.sql.free_results(result)
check_type(sql_result, result, 'sysbench.sql.free_results')
return ffi.C.db_free_results(result)
end
function sysbench.sql.driver_name(driver)
return ffi.string(driver.sname)
end
-- sql_driver metatable
local driver_mt = {
__index = {
connect = sysbench.sql.connect,
name = sysbench.sql.driver_name,
},
__gc = ffi.C.db_destroy,
__tostring = function() return '<sql_driver>' end,
}
ffi.metatype("sql_driver", driver_mt)
-- sql_connection metatable
local connection_mt = {
__index = {
disconnect = sysbench.sql.disconnect,
query = sysbench.sql.query,
query_row = sysbench.sql.query_row,
bulk_insert_init = sysbench.sql.bulk_insert_init,
bulk_insert_next = sysbench.sql.bulk_insert_next,
bulk_insert_done = sysbench.sql.bulk_insert_done,
prepare = sysbench.sql.prepare,
},
__tostring = function() return '<sql_connection>' end,
__gc = ffi.C.db_connection_free,
}
ffi.metatype("struct db_conn", connection_mt)
-- sql_statement metatable
local statement_mt = {
__index = {
bind_param = sysbench.sql.bind_param,
bind_create = sysbench.sql.bind_create,
execute = sysbench.sql.execute,
close = sysbench.sql.close
},
__tostring = function() return '<sql_statement>' end,
__gc = sysbench.sql.close,
}
ffi.metatype("struct db_stmt", statement_mt)
-- sql_bind metatable
local bind_mt = {
__index = {
set = sysbench.sql.bind_set
},
__tostring = function() return '<sql_bind>' end,
__gc = sysbench.sql.bind_destroy
}
ffi.metatype("sql_bind", bind_mt)
-- sql_results metatable
local result_mt = {
__index = {
fetch_row = sysbench.sql.fetch_row,
free = sysbench.sql.free_results,
},
__index = result_methods,
__tostring = function() return '<sql_result>' end,
__gc = sysbench.sql.free_results
__gc = ffi.C.db_free_results
}
ffi.metatype("sql_result", result_mt)
-- error codes
sysbench.sql.ERROR_NONE = ffi.C.DB_ERROR_NONE
sysbench.sql.ERROR_IGNORABLE = ffi.C.DB_ERROR_IGNORABLE
sysbench.sql.ERROR_FATAL = ffi.C.DB_ERROR_FATAL
sysbench.sql.error = {}
sysbench.sql.error.NONE = ffi.C.DB_ERROR_NONE
sysbench.sql.error.IGNORABLE = ffi.C.DB_ERROR_IGNORABLE
sysbench.sql.error.FATAL = ffi.C.DB_ERROR_FATAL

View File

@ -18,6 +18,12 @@
-- Common code for OLTP benchmarks.
-- -----------------------------------------------------------------------------
function init()
assert(event ~= nil,
"this script is meant to be included by other OLTP scripts and " ..
"should not be called directly.")
end
-- Generate strings of random digits with 11-digit groups separated by dashes
function get_c_value()
-- 10 groups, 119 characters
@ -124,10 +130,6 @@ CREATE TABLE sbtest%d(
end
function thread_init()
assert(event ~= nil,
"This script is meant to be included by other OLTP scripts and " ..
"should not be called directly.")
set_vars()
drv = sysbench.sql.driver()

View File

@ -33,7 +33,7 @@
/*
Auto-generated headers for internal scripts. If you add a new header here,
make sure it is also added to internal_sources.
make sure it is also added to the internal_sources array below.
*/
#include "lua/internal/sysbench.lua.h"
#include "lua/internal/sysbench.rand.lua.h"
@ -49,20 +49,6 @@
#define INIT_FUNC "init"
#define DONE_FUNC "done"
/* Macros to call Lua functions */
#define CALL_ERROR(L, name) \
do { \
const char *err = lua_tostring(L, -1); \
log_text(LOG_FATAL, "failed to execute function `%s': %s", \
name, err ? err : "(null)"); \
} while (0)
#define CHECK_CONNECTION(L, ctxt) \
do { \
if (ctxt->con == NULL) \
luaL_error(L, "Uninitialized database connection"); \
} while(0);
/* Interpreter context */
typedef struct {
@ -100,6 +86,11 @@ typedef struct {
unsigned int *source_len;
} internal_script_t;
typedef enum {
SB_LUA_ERROR_NONE,
SB_LUA_ERROR_RESTART_EVENT
} sb_lua_error_t;
/* Lua interpreter states */
static lua_State **states CK_CC_CACHELINE;
@ -113,6 +104,20 @@ static const char *sb_lua_script_path CK_CC_CACHELINE;
static TLS sb_lua_ctxt_t *tls_lua_ctxt CK_CC_CACHELINE;
/* Database driver */
static TLS db_driver_t *db_driver;
/* List of pre-loaded internal scripts */
static internal_script_t internal_scripts[] = {
{"sysbench.rand.lua", sysbench_rand_lua, &sysbench_rand_lua_len},
{"sysbench.lua", sysbench_lua, &sysbench_lua_len},
{"sysbench.sql.lua", sysbench_sql_lua, &sysbench_sql_lua_len},
{NULL, NULL, 0}
};
/* Main (global) interpreter state */
static lua_State *gstate;
/* Lua test operations */
static int sb_lua_op_init(void);
@ -129,20 +134,6 @@ static sb_operations_t lua_ops = {
.done = sb_lua_op_done
};
/* Main (global) interpreter state */
static lua_State *gstate;
/* Database driver */
static TLS db_driver_t *db_driver;
/* List of pre-loaded internal scripts */
static internal_script_t internal_scripts[] = {
{"sysbench.rand.lua", sysbench_rand_lua, &sysbench_rand_lua_len},
{"sysbench.lua", sysbench_lua, &sysbench_lua_len},
{"sysbench.sql.lua", sysbench_sql_lua, &sysbench_sql_lua_len},
{NULL, NULL, 0}
};
/* Lua test commands */
static int sb_lua_cmd_prepare(void);
static int sb_lua_cmd_cleanup(void);
@ -172,7 +163,30 @@ static int sb_lua_more_events(lua_State *);
static int sb_lua_event_start(lua_State *);
static int sb_lua_event_stop(lua_State *);
unsigned int sb_lua_table_size(lua_State *, int);
static unsigned int sb_lua_table_size(lua_State *, int);
static void call_error(lua_State *L, const char *name)
{
const char * const err = lua_tostring(L, -1);
log_text(LOG_FATAL, "`%s' function failed: %s", name,
err ? err : "(not a string)");
lua_pop(L, 1);
}
static void check_connection(lua_State *L, sb_lua_ctxt_t *ctxt)
{
if (ctxt->con == NULL)
luaL_error(L, "Uninitialized database connection");
}
static bool func_available(lua_State *L, const char *func)
{
lua_getglobal(L, func);
bool rc = !lua_isnil(L, -1) && lua_type(L, -1) == LUA_TFUNCTION;
lua_pop(L, 1);
return rc;
}
/* Load a specified Lua script */
@ -190,17 +204,13 @@ sb_test_t *sb_load_lua(const char *testname)
goto error;
/* Test commands */
lua_getglobal(gstate, PREPARE_FUNC);
if (!lua_isnil(gstate, -1))
if (func_available(gstate, PREPARE_FUNC))
sbtest.cmds.prepare = &sb_lua_cmd_prepare;
lua_pop(gstate, 1);
lua_getglobal(gstate, CLEANUP_FUNC);
if (!lua_isnil(gstate, -1))
if (func_available(gstate, CLEANUP_FUNC))
sbtest.cmds.cleanup = &sb_lua_cmd_cleanup;
lua_getglobal(gstate, HELP_FUNC);
if (!lua_isnil(gstate, -1) && lua_isfunction(gstate, -1))
if (func_available(gstate, HELP_FUNC))
sbtest.cmds.help = &sb_lua_cmd_help;
/* Test operations */
@ -210,11 +220,9 @@ sb_test_t *sb_load_lua(const char *testname)
sbtest.ops.thread_done = &sb_lua_op_thread_done;
lua_getglobal(gstate, THREAD_RUN_FUNC);
if (!lua_isnil(gstate, -1))
if (func_available(gstate, THREAD_RUN_FUNC))
sbtest.ops.thread_run = &sb_lua_op_thread_run;
sbtest.ops.print_stats = &sb_lua_op_print_stats;
/* Allocate per-thread interpreters array */
@ -242,11 +250,18 @@ int sb_lua_op_init(void)
{
if (lua_pcall(gstate, 0, 0, 0))
{
CALL_ERROR(gstate, INIT_FUNC);
call_error(gstate, INIT_FUNC);
return 1;
}
}
if (!func_available(gstate, EVENT_FUNC))
{
log_text(LOG_FATAL, "cannot find the event() function in %s",
sb_lua_script_path);
return 1;
}
return 0;
}
@ -285,7 +300,7 @@ int sb_lua_op_thread_init(int thread_id)
if (lua_pcall(L, 1, 1, 0))
{
CALL_ERROR(L, THREAD_INIT_FUNC);
call_error(L, THREAD_INIT_FUNC);
return 1;
}
}
@ -302,7 +317,7 @@ int sb_lua_op_thread_run(int thread_id)
if (lua_pcall(L, 1, 1, 0))
{
CALL_ERROR(L, THREAD_RUN_FUNC);
call_error(L, THREAD_RUN_FUNC);
return 1;
}
@ -315,14 +330,13 @@ int sb_lua_op_thread_done(int thread_id)
int rc = 0;
lua_getglobal(L, THREAD_DONE_FUNC);
if (!lua_isnil(L, -1))
{
lua_pushnumber(L, thread_id);
if (lua_pcall(L, 1, 1, 0))
{
CALL_ERROR(L, THREAD_DONE_FUNC);
call_error(L, THREAD_DONE_FUNC);
rc = 1;
}
}
@ -347,7 +361,7 @@ int sb_lua_op_done(void)
{
if (lua_pcall(gstate, 0, 0, 0))
{
CALL_ERROR(gstate, DONE_FUNC);
call_error(gstate, DONE_FUNC);
return 1;
}
}
@ -365,6 +379,7 @@ static int load_internal_scripts(lua_State *L)
{
log_text(LOG_FATAL, "failed to load internal module '%s': %s",
s->name, lua_tostring(L, -1));
lua_pop(L, 1);
return 1;
}
@ -478,11 +493,19 @@ lua_State *sb_lua_new_state(int thread_id)
SB_LUA_VAR("DB_ERROR_RESTART_TRANSACTION", DB_ERROR_IGNORABLE);
SB_LUA_VAR("DB_ERROR_FAILED", DB_ERROR_FATAL);
lua_settable(L, -3); /* sysbench.db */
lua_pushstring(L, "error");
lua_newtable(L);
SB_LUA_VAR("NONE", SB_LUA_ERROR_NONE);
SB_LUA_VAR("RESTART_EVENT", SB_LUA_ERROR_RESTART_EVENT);
lua_settable(L, -3); /* sysbench.error */
#undef SB_LUA_VAR
#undef SB_LUA_FUNC
lua_settable(L, -3); /* sysbench.db */
lua_setglobal(L, "sysbench");
luaL_newmetatable(L, "sysbench.stmt");
@ -634,24 +657,41 @@ int sb_lua_db_disconnect(lua_State *L)
return 0;
}
/*
Throw an error with the { errcode = RESTART_EVENT } table. This will make
thread_run() restart the event.
*/
static void throw_restart_event(lua_State *L)
{
log_text(LOG_DEBUG, "Ignored error encountered, restarting transaction");
lua_newtable(L);
lua_pushstring(L, "errcode");
lua_pushnumber(L, SB_LUA_ERROR_RESTART_EVENT);
lua_settable(L, -3);
lua_error(L); /* this call never returns */
}
int sb_lua_db_query(lua_State *L)
{
const char *query;
db_result_t *rs;
size_t len;
const char *query;
db_result_t *rs;
size_t len;
if (tls_lua_ctxt->con == NULL)
sb_lua_db_connect(L);
query = luaL_checklstring(L, 1, &len);
rs = db_query(tls_lua_ctxt->con, query, len);
if (rs == NULL)
{
lua_pushnumber(L, tls_lua_ctxt->con->db_errno);
lua_error(L);
}
db_conn_t * const con = tls_lua_ctxt->con;
db_free_results(rs);
query = luaL_checklstring(L, 1, &len);
rs = db_query(con, query, len);
if (rs == NULL && con->sql_errno == DB_ERROR_IGNORABLE)
throw_restart_event(L);
if (rs != NULL)
db_free_results(rs);
return 0;
}
@ -676,7 +716,7 @@ int sb_lua_db_bulk_insert_next(lua_State *L)
const char *query;
size_t len;
CHECK_CONNECTION(L, tls_lua_ctxt);
check_connection(L, tls_lua_ctxt);
query = luaL_checklstring(L, 1, &len);
if (db_bulk_insert_next(tls_lua_ctxt->con, query, len))
@ -687,7 +727,7 @@ int sb_lua_db_bulk_insert_next(lua_State *L)
int sb_lua_db_bulk_insert_done(lua_State *L)
{
CHECK_CONNECTION(L, tls_lua_ctxt);
check_connection(L, tls_lua_ctxt);
db_bulk_insert_done(tls_lua_ctxt->con);
@ -726,7 +766,7 @@ int sb_lua_db_bind_param(lua_State *L)
db_bind_t *binds;
char needs_rebind = 0;
CHECK_CONNECTION(L, tls_lua_ctxt);
check_connection(L, tls_lua_ctxt);
stmt = (sb_lua_db_stmt_t *)luaL_checkudata(L, 1, "sysbench.stmt");
luaL_argcheck(L, stmt != NULL, 1, "prepared statement expected");
@ -798,7 +838,7 @@ int sb_lua_db_bind_result(lua_State *L)
db_bind_t *binds;
char needs_rebind = 0;
CHECK_CONNECTION(L, tls_lua_ctxt);
check_connection(L, tls_lua_ctxt);
stmt = (sb_lua_db_stmt_t *)luaL_checkudata(L, 1, "sysbench.stmt");
luaL_argcheck(L, stmt != NULL, 1, "prepared statement expected");
@ -874,7 +914,7 @@ int sb_lua_db_execute(lua_State *L)
const char *str;
sb_lua_bind_t *param;
CHECK_CONNECTION(L, tls_lua_ctxt);
check_connection(L, tls_lua_ctxt);
stmt = (sb_lua_db_stmt_t *)luaL_checkudata(L, 1, "sysbench.stmt");
luaL_argcheck(L, stmt != NULL, 1, "prepared statement expected");
@ -952,11 +992,10 @@ int sb_lua_db_execute(lua_State *L)
}
ptr = db_execute(stmt->ptr);
if (ptr == NULL)
if (ptr == NULL && tls_lua_ctxt->con->sql_errno == DB_ERROR_IGNORABLE)
{
stmt->rs = NULL;
lua_pushnumber(L, tls_lua_ctxt->con->db_errno);
lua_error(L);
throw_restart_event(L);
}
else
{
@ -975,7 +1014,7 @@ int sb_lua_db_close(lua_State *L)
sb_lua_db_stmt_t *stmt;
unsigned int i;
CHECK_CONNECTION(L, tls_lua_ctxt);
check_connection(L, tls_lua_ctxt);
stmt = (sb_lua_db_stmt_t *)luaL_checkudata(L, 1, "sysbench.stmt");
luaL_argcheck(L, stmt != NULL, 1, "prepared statement expected");
@ -1000,7 +1039,7 @@ int sb_lua_db_store_results(lua_State *L)
{
sb_lua_db_rs_t *rs;
CHECK_CONNECTION(L, tls_lua_ctxt);
check_connection(L, tls_lua_ctxt);
rs = (sb_lua_db_rs_t *)luaL_checkudata(L, 1, "sysbench.rs");
luaL_argcheck(L, rs != NULL, 1, "result set expected");
@ -1014,7 +1053,7 @@ int sb_lua_db_free_results(lua_State *L)
{
sb_lua_db_rs_t *rs;
CHECK_CONNECTION(L, tls_lua_ctxt);
check_connection(L, tls_lua_ctxt);
rs = (sb_lua_db_rs_t *)luaL_checkudata(L, 1, "sysbench.rs");
luaL_argcheck(L, rs != NULL, 1, "result set expected");

View File

@ -49,7 +49,7 @@ void sb_timer_init(sb_timer_t *t)
void sb_timer_reset(sb_timer_t *t)
{
t->min_time = 0xffffffffffffffffULL;
t->min_time = UINT64_MAX;
t->max_time = 0;
t->sum_time = 0;
t->events = 0;
@ -113,6 +113,8 @@ uint64_t sb_timer_sum(sb_timer_t *t)
uint64_t sb_timer_min(sb_timer_t *t)
{
if (t->events == 0)
return 0;
return t->min_time;
}

View File

@ -18,9 +18,19 @@ function thread_init()
end
function event()
local e, m
print("drv:name() = " .. drv:name())
print("SQL types:")
for k,v in pairs(sysbench.sql.type) do print(k .. " = " .. v) end
print()
print('--')
print("SQL error codes:")
for k,v in pairs(sysbench.sql.error) do print(k .. " = " .. v) end
print('--')
e, m = pcall(sysbench.sql.driver, "non-existing")
print(m)
con:query("DROP TABLE IF EXISTS t")
con:query("CREATE TABLE t(a INT)")
@ -53,15 +63,43 @@ function event()
local stmt = con:prepare("UPDATE t SET a = a + ?, b = ?")
local a = stmt:bind_create(sysbench.sql.type.INT)
local b = stmt:bind_create(sysbench.sql.type.CHAR, 10)
print(a)
print(b)
e, m = pcall(stmt.bind_create, stmt, sysbench.sql.type.DATE)
print(m)
print(stmt:bind_param())
stmt:bind_param(a, b)
a:set(100)
rs = stmt:execute()
rs1 = stmt:execute()
print(rs1)
a:set(200)
b:set("01234567890")
rs = stmt:execute()
rs:free()
rs2 = stmt:execute()
row = rs2:fetch_row()
print(rs2)
rs2:free()
e, m = pcall(rs2.free, rs)
print(m)
e, m = pcall(rs1.free, rs)
print(m)
stmt:close()
print('--')
con:disconnect()
e, m = pcall(con.query, con, "SELECT 1")
print(m)
con:disconnect()
print('--')
con = drv:connect()
rs = con:query("SELECT MIN(a), MAX(a), MIN(b), MAX(b) FROM t")
print(rs.nfields)
for i = 1, rs.nrows do
@ -72,8 +110,21 @@ function event()
end
EOF
# Failed connection handling
sysbench $SB_ARGS run
SB_ARGS="--verbosity=1 --test=$CRAMTMP/api_sql.lua --max-requests=1 --num-threads=1 $DB_DRIVER_ARGS"
cat >$CRAMTMP/api_sql.lua <<EOF
function event()
local drv = sysbench.sql.driver()
local e,m = pcall(drv.connect, drv)
print(m)
end
EOF
sysbench $SB_ARGS --mysql-host="non-existing" --pgsql-host="non-existing" run
########################################################################
# Multiple connections test
########################################################################

View File

@ -59,3 +59,24 @@ Basic Lua API tests
$ sysbench $SB_ARGS help
tid:(nil) help()
########################################################################
Error handling
########################################################################
# Syntax errors in the script
$ cat >$CRAMTMP/api_basic.lua <<EOF
> foo
> EOF
$ sysbench $SB_ARGS run
PANIC: unprotected error in call to Lua API (*/api_basic.lua:2: '=' expected near '<eof>') (glob)
[1]
# Missing event function
$ cat >$CRAMTMP/api_basic.lua <<EOF
> function foo()
> end
> EOF
$ sysbench $SB_ARGS run
FATAL: cannot find the event() function in */api_basic.lua (glob)
[1]

View File

@ -5,6 +5,7 @@ SQL Lua API + MySQL tests
$ . ${SBTEST_INCDIR}/mysql_common.sh
$ . ${SBTEST_INCDIR}/api_sql_common.sh
drv:name() = mysql
SQL types:
NONE = 0
INT = 3
CHAR = 11
@ -18,7 +19,14 @@ SQL Lua API + MySQL tests
DATE = 8
DATETIME = 9
DOUBLE = 6
--
SQL error codes:
NONE = 0
FATAL = 2
IGNORABLE = 1
--
FATAL: invalid database driver name: 'non-existing'
failed to initialize the DB driver
100
--
--
@ -29,9 +37,28 @@ SQL Lua API + MySQL tests
--
nil 2
--
<sql_bind>
<sql_bind>
Unsupported argument type: 8
nil
<sql_result>
ALERT: attempt to fetch row from an empty result set
<sql_result>
ALERT: attempt to free an invalid result set
db_free_results() failed
db_free_results() failed
--
(last message repeated 1 times)
ALERT: attempt to use an already closed connection
[string "sysbench.sql.lua"]:*: Fatal SQL error, drv_errno = 0 (glob)
ALERT: attempt to close an already closed connection
--
4
301 400 0123456789 0123456789
--
FATAL: unable to connect to MySQL server on host 'non-existing', port 3306, aborting...
FATAL: error 2005: Unknown MySQL server host 'non-existing' (0)
connection creation failed
1
2
3