MXS-2067: Replace most SPINLOCKs

Replaced SPINLOCK with std::mutex where possible, leaving out the more
complex cases. The big offenders remaining are the binlogrouter and the
gateway.cc OpenSSL locks.
This commit is contained in:
Markus Mäkelä
2018-09-26 09:35:33 +03:00
parent 50451166bb
commit ab4f870927
17 changed files with 76 additions and 215 deletions

View File

@ -203,7 +203,7 @@ typedef struct
lua_State* global_lua_state;
char* global_script;
char* session_script;
SPINLOCK lock;
std::mutex lock;
} LUA_INSTANCE;
/**
@ -249,17 +249,16 @@ void expose_functions(lua_State* state, GWBUF** active_buffer)
*/
static MXS_FILTER* createInstance(const char* name, MXS_CONFIG_PARAMETER* params)
{
LUA_INSTANCE* my_instance;
LUA_INSTANCE* my_instance = new (std::nothrow) LUA_INSTANCE;
if ((my_instance = (LUA_INSTANCE*) MXS_CALLOC(1, sizeof(LUA_INSTANCE))) == NULL)
if (my_instance == NULL)
{
return NULL;
}
spinlock_init(&my_instance->lock);
my_instance->global_script = config_copy_string(params, "global_script");
my_instance->session_script = config_copy_string(params, "session_script");
my_instance->global_lua_state = nullptr;
if (my_instance->global_script)
{
@ -366,7 +365,7 @@ static MXS_FILTER_SESSION* newSession(MXS_FILTER* instance, MXS_SESSION* session
if (my_session && my_instance->global_lua_state)
{
spinlock_acquire(&my_instance->lock);
std::lock_guard<std::mutex> guard(my_instance->lock);
lua_getglobal(my_instance->global_lua_state, "newSession");
lua_pushstring(my_instance->global_lua_state, session->client_dcb->user);
@ -379,8 +378,6 @@ static MXS_FILTER_SESSION* newSession(MXS_FILTER* instance, MXS_SESSION* session
lua_tostring(my_instance->global_lua_state, -1));
lua_pop(my_instance->global_lua_state, -1); // Pop the error off the stack
}
spinlock_release(&my_instance->lock);
}
return (MXS_FILTER_SESSION*)my_session;
@ -415,7 +412,7 @@ static void closeSession(MXS_FILTER* instance, MXS_FILTER_SESSION* session)
if (my_instance->global_lua_state)
{
spinlock_acquire(&my_instance->lock);
std::lock_guard<std::mutex> guard(my_instance->lock);
lua_getglobal(my_instance->global_lua_state, "closeSession");
@ -426,7 +423,6 @@ static void closeSession(MXS_FILTER* instance, MXS_FILTER_SESSION* session)
lua_tostring(my_instance->global_lua_state, -1));
lua_pop(my_instance->global_lua_state, -1);
}
spinlock_release(&my_instance->lock);
}
}
@ -502,7 +498,7 @@ static int32_t clientReply(MXS_FILTER* instance, MXS_FILTER_SESSION* session, GW
if (my_instance->global_lua_state)
{
spinlock_acquire(&my_instance->lock);
std::lock_guard<std::mutex> guard(my_instance->lock);
lua_getglobal(my_instance->global_lua_state, "clientReply");
@ -512,8 +508,6 @@ static int32_t clientReply(MXS_FILTER* instance, MXS_FILTER_SESSION* session, GW
lua_tostring(my_session->lua_state, -1));
lua_pop(my_instance->global_lua_state, -1);
}
spinlock_release(&my_instance->lock);
}
return my_session->up.clientReply(my_session->up.instance,
@ -585,7 +579,7 @@ static int32_t routeQuery(MXS_FILTER* instance, MXS_FILTER_SESSION* session, GWB
if (my_instance->global_lua_state)
{
spinlock_acquire(&my_instance->lock);
std::lock_guard<std::mutex> guard(my_instance->lock);
current_global_query = queue;
lua_getglobal(my_instance->global_lua_state, "routeQuery");
@ -612,7 +606,6 @@ static int32_t routeQuery(MXS_FILTER* instance, MXS_FILTER_SESSION* session, GWB
}
current_global_query = NULL;
spinlock_release(&my_instance->lock);
}
MXS_FREE(fullquery);
@ -652,7 +645,7 @@ static void diagnostic(MXS_FILTER* instance, MXS_FILTER_SESSION* fsession, DCB*
{
if (my_instance->global_lua_state)
{
spinlock_acquire(&my_instance->lock);
std::lock_guard<std::mutex> guard(my_instance->lock);
lua_getglobal(my_instance->global_lua_state, "diagnostic");
@ -672,7 +665,6 @@ static void diagnostic(MXS_FILTER* instance, MXS_FILTER_SESSION* fsession, DCB*
lua_tostring(my_instance->global_lua_state, -1));
lua_pop(my_instance->global_lua_state, -1);
}
spinlock_release(&my_instance->lock);
}
if (my_instance->global_script)
{
@ -703,7 +695,7 @@ static json_t* diagnostic_json(const MXS_FILTER* instance, const MXS_FILTER_SESS
{
if (my_instance->global_lua_state)
{
spinlock_acquire(&my_instance->lock);
std::lock_guard<std::mutex> guard(my_instance->lock);
lua_getglobal(my_instance->global_lua_state, "diagnostic");
@ -721,7 +713,6 @@ static json_t* diagnostic_json(const MXS_FILTER* instance, const MXS_FILTER_SESS
{
lua_pop(my_instance->global_lua_state, -1);
}
spinlock_release(&my_instance->lock);
}
if (my_instance->global_script)
{

View File

@ -213,8 +213,8 @@ typedef struct
int conn_stat; /**state of the connection to the server*/
int rconn_intv; /**delay for reconnects, in seconds*/
time_t last_rconn; /**last reconnect attempt*/
SPINLOCK rconn_lock;
SPINLOCK msg_lock;
std::mutex rconn_lock;
std::mutex msg_lock;
mqmessage* messages;
enum log_trigger_t trgtype;
SRC_TRIG* src_trg;

View File

@ -749,7 +749,6 @@ AvroSession::AvroSession(Avro* instance, MXS_SESSION* session)
: dcb(session->client_dcb)
, state(AVRO_CLIENT_UNREGISTERED)
, format(AVRO_FORMAT_UNDEFINED)
, catch_lock(SPINLOCK_INIT)
, router(instance)
, file_handle(NULL)
, last_sent_pos(0)

View File

@ -153,7 +153,6 @@ public:
int state; /*< The state of this client */
enum avro_data_format format; /*< Stream JSON or Avro data */
std::string uuid; /*< Client UUID */
SPINLOCK catch_lock; /*< Event catchup lock */
Avro* router; /*< Pointer to the owning router */
MAXAVRO_FILE* file_handle; /*< Current open file handle */
uint64_t last_sent_pos;/*< The last record we sent */

View File

@ -54,9 +54,6 @@ static uint64_t getCapabilities(MXS_ROUTER* instance);
extern int execute_cmd(CLI_SESSION* cli);
static SPINLOCK instlock;
static CLI_INSTANCE* instances;
/**
* The module entry point routine. It is this routine that
* must populate the structure that is referred to as the
@ -68,8 +65,6 @@ static CLI_INSTANCE* instances;
extern "C" MXS_MODULE* MXS_CREATE_MODULE()
{
MXS_NOTICE("Initialise CLI router module");
spinlock_init(&instlock);
instances = NULL;
static MXS_ROUTER_OBJECT MyObject =
{
@ -129,16 +124,6 @@ static MXS_ROUTER* createInstance(SERVICE* service, MXS_CONFIG_PARAMETER* params
spinlock_init(&inst->lock);
inst->sessions = NULL;
/*
* We have completed the creation of the instance data, so now
* insert this router instance into the linked list of routers
* that have been created with this module.
*/
spinlock_acquire(&instlock);
inst->next = instances;
instances = inst;
spinlock_release(&instlock);
return (MXS_ROUTER*)inst;
}

View File

@ -37,7 +37,6 @@
*/
struct ROUTER_CLIENT_SES : MXS_ROUTER_SESSION
{
SPINLOCK rses_lock; /*< protects rses_deleted */
int rses_versno;/*< even = no active update, else odd */
bool rses_closed;/*< true when closeSession is called */
SERVER_REF* backend; /*< Backend used by the client session */
@ -62,7 +61,6 @@ struct ROUTER_STATS
struct ROUTER_INSTANCE : public MXS_ROUTER
{
SERVICE* service; /*< Pointer to the service using this router */
SPINLOCK lock; /*< Spinlock for the instance data */
uint64_t bitmask_and_bitvalue; /*< Lower 32-bits for bitmask and upper for bitvalue */
ROUTER_STATS stats; /*< Statistics for this router */
};

View File

@ -110,8 +110,6 @@ static void handleError(MXS_ROUTER* instance,
bool* succp);
static uint64_t getCapabilities(MXS_ROUTER* instance);
static bool configureInstance(MXS_ROUTER* instance, MXS_CONFIG_PARAMETER* params);
static bool rses_begin_locked_router_action(ROUTER_CLIENT_SES* rses);
static void rses_end_locked_router_action(ROUTER_CLIENT_SES* rses);
static SERVER_REF* get_root_master(SERVER_REF* servers);
/**
@ -248,7 +246,6 @@ static MXS_ROUTER* createInstance(SERVICE* service, MXS_CONFIG_PARAMETER* params
{
inst->service = service;
spinlock_init(&inst->lock);
inst->bitmask_and_bitvalue = 0;
if (!configureInstance((MXS_ROUTER*)inst, params))
@ -478,28 +475,11 @@ static void freeSession(MXS_ROUTER* router_instance, MXS_ROUTER_SESSION* router_
static void closeSession(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session)
{
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES*) router_session;
DCB* backend_dcb;
mxb_assert(router_cli_ses->backend_dcb);
/**
* Lock router client session for secure read and update.
*/
if (rses_begin_locked_router_action(router_cli_ses))
if (router_cli_ses->backend_dcb)
{
/* decrease server current connection counter */
backend_dcb = router_cli_ses->backend_dcb;
router_cli_ses->backend_dcb = NULL;
router_cli_ses->rses_closed = true;
/** Unlock */
rses_end_locked_router_action(router_cli_ses);
/**
* Close the backend server connection
*/
if (backend_dcb != NULL)
{
dcb_close(backend_dcb);
}
dcb_close(router_cli_ses->backend_dcb);
}
}
@ -596,33 +576,14 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session,
DCB* backend_dcb;
MySQLProtocol* proto = (MySQLProtocol*)router_cli_ses->client_dcb->protocol;
mxs_mysql_cmd_t mysql_command = proto->current_command;
bool rses_is_closed;
bool rses_is_closed = router_cli_ses->rses_closed;
inst->stats.n_queries++;
// Due to the streaming nature of readconnroute, this is not accurate
mxb::atomic::add(&router_cli_ses->backend->server->stats.packets, 1, mxb::atomic::RELAXED);
/** Dirty read for quick check if router is closed. */
if (router_cli_ses->rses_closed)
{
rses_is_closed = true;
}
else
{
/**
* Lock router client session for secure read of DCBs
*/
rses_is_closed = !(rses_begin_locked_router_action(router_cli_ses));
}
if (!rses_is_closed)
{
backend_dcb = router_cli_ses->backend_dcb;
/** unlock */
rses_end_locked_router_action(router_cli_ses);
}
backend_dcb = router_cli_ses->backend_dcb;
bool valid;
char* trc = NULL;
@ -794,63 +755,6 @@ static void handleError(MXS_ROUTER* instance,
*succp = false;
}
/** to be inline'd */
/**
* @node Acquires lock to router client session if it is not closed.
*
* Parameters:
* @param rses - in, use
*
*
* @return true if router session was not closed. If return value is true
* it means that router is locked, and must be unlocked later. False, if
* router was closed before lock was acquired.
*
*
* @details (write detailed description here)
*
*/
static bool rses_begin_locked_router_action(ROUTER_CLIENT_SES* rses)
{
bool succp = false;
if (rses->rses_closed)
{
goto return_succp;
}
spinlock_acquire(&rses->rses_lock);
if (rses->rses_closed)
{
spinlock_release(&rses->rses_lock);
goto return_succp;
}
succp = true;
return_succp:
return succp;
}
/** to be inline'd */
/**
* @node Releases router client session lock.
*
* Parameters:
* @param rses - <usage>
* <description>
*
* @return void
*
*
* @details (write detailed description here)
*
*/
static void rses_end_locked_router_action(ROUTER_CLIENT_SES* rses)
{
spinlock_release(&rses->rses_lock);
}
static uint64_t getCapabilities(MXS_ROUTER* instance)
{
return RCAP_TYPE_RUNTIME_CONFIG;

View File

@ -47,7 +47,6 @@ SchemaRouter::SchemaRouter(SERVICE* service, SConfig config)
, m_config(config)
, m_service(service)
{
spinlock_init(&m_lock);
}
SchemaRouter::~SchemaRouter()

View File

@ -14,6 +14,7 @@
#include "schemarouter.hh"
#include <mutex>
#include <set>
#include <string>
@ -51,7 +52,7 @@ private:
SConfig m_config; /*< expanded config info from SERVICE */
ShardManager m_shard_manager; /*< Shard maps hashed by user name */
SERVICE* m_service; /*< Pointer to service */
SPINLOCK m_lock; /*< Lock for the instance data */
std::mutex m_lock; /*< Lock for the instance data */
Stats m_stats; /*< Statistics for this router */
};
}

View File

@ -111,7 +111,8 @@ void SchemaRouterSession::close()
}
}
spinlock_acquire(&m_router->m_lock);
std::lock_guard<std::mutex> guard(m_router->m_lock);
if (m_router->m_stats.longest_sescmd < m_stats.longest_sescmd)
{
m_router->m_stats.longest_sescmd = m_stats.longest_sescmd;
@ -129,8 +130,6 @@ void SchemaRouterSession::close()
m_router->m_stats.ses_average =
(ses_time + ((m_router->m_stats.sessions - 1) * m_router->m_stats.ses_average))
/ (m_router->m_stats.sessions);
spinlock_release(&m_router->m_lock);
}
}