Remove session locks from schemarouter
The session level locks are no longer needed in the schemarouter. Also cleaned up some parts of the code.
This commit is contained in:
@ -92,8 +92,6 @@ static bool get_shard_dcb(DCB** dcb,
|
||||
SCHEMAROUTER_SESSION* rses,
|
||||
char* name);
|
||||
|
||||
static bool rses_begin_locked_router_action(SCHEMAROUTER_SESSION* rses);
|
||||
static void rses_end_locked_router_action(SCHEMAROUTER_SESSION* rses);
|
||||
static void mysql_sescmd_done(mysql_sescmd_t* sescmd);
|
||||
static mysql_sescmd_t* mysql_sescmd_init(rses_property_t* rses_prop,
|
||||
GWBUF* sescmd_buf,
|
||||
@ -990,28 +988,15 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* router_inst, MXS_SESSION* sess
|
||||
router_nservers = i;
|
||||
}
|
||||
|
||||
spinlock_init(&client_rses->rses_lock);
|
||||
client_rses->rses_backend_ref = backend_ref;
|
||||
client_rses->rses_nbackends = router_nservers;
|
||||
|
||||
/**
|
||||
* Find a backend servers to connect to.
|
||||
* This command requires that rsession's lock is held.
|
||||
*/
|
||||
if (!(succp = rses_begin_locked_router_action(client_rses)))
|
||||
{
|
||||
MXS_FREE(client_rses->rses_backend_ref);
|
||||
MXS_FREE(client_rses);
|
||||
return NULL;
|
||||
}
|
||||
/**
|
||||
* Connect to all backend servers
|
||||
*/
|
||||
succp = connect_backend_servers(backend_ref, router_nservers, session, router);
|
||||
|
||||
rses_end_locked_router_action(client_rses);
|
||||
|
||||
if (!succp || !(succp = rses_begin_locked_router_action(client_rses)))
|
||||
if (!succp || client_rses->closed)
|
||||
{
|
||||
MXS_FREE(client_rses->rses_backend_ref);
|
||||
MXS_FREE(client_rses);
|
||||
@ -1024,8 +1009,6 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* router_inst, MXS_SESSION* sess
|
||||
snprintf(client_rses->connect_db, MYSQL_DATABASE_MAXLEN + 1, "%s", db);
|
||||
}
|
||||
|
||||
rses_end_locked_router_action(client_rses);
|
||||
|
||||
atomic_add(&router->stats.sessions, 1);
|
||||
|
||||
return (void *)client_rses;
|
||||
@ -1042,55 +1025,26 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* router_inst, MXS_SESSION* sess
|
||||
*/
|
||||
static void closeSession(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session)
|
||||
{
|
||||
SCHEMAROUTER_SESSION* router_cli_ses;
|
||||
SCHEMAROUTER* inst;
|
||||
backend_ref_t* backend_ref;
|
||||
|
||||
MXS_DEBUG("%lu [schemarouter:closeSession]", pthread_self());
|
||||
|
||||
/**
|
||||
* router session can be NULL if newSession failed and it is discarding
|
||||
* its connections and DCB's.
|
||||
*/
|
||||
if (router_session == NULL)
|
||||
{
|
||||
return;
|
||||
}
|
||||
router_cli_ses = (SCHEMAROUTER_SESSION *)router_session;
|
||||
SCHEMAROUTER_SESSION *router_cli_ses = (SCHEMAROUTER_SESSION *)router_session;
|
||||
CHK_CLIENT_RSES(router_cli_ses);
|
||||
ss_dassert(!router_cli_ses->closed);
|
||||
|
||||
inst = router_cli_ses->router;
|
||||
backend_ref = router_cli_ses->rses_backend_ref;
|
||||
/**
|
||||
* Lock router client session for secure read and update.
|
||||
*/
|
||||
if (!router_cli_ses->rses_closed && rses_begin_locked_router_action(router_cli_ses))
|
||||
if (!router_cli_ses->closed)
|
||||
{
|
||||
int i;
|
||||
/**
|
||||
* This sets router closed. Nobody is allowed to use router
|
||||
* whithout checking this first.
|
||||
*/
|
||||
router_cli_ses->rses_closed = true;
|
||||
router_cli_ses->closed = true;
|
||||
|
||||
for (i = 0; i < router_cli_ses->rses_nbackends; i++)
|
||||
for (int i = 0; i < router_cli_ses->rses_nbackends; i++)
|
||||
{
|
||||
backend_ref_t* bref = &backend_ref[i];
|
||||
backend_ref_t* bref = &router_cli_ses->rses_backend_ref[i];
|
||||
DCB* dcb = bref->bref_dcb;
|
||||
/** Close those which had been connected */
|
||||
if (BREF_IS_IN_USE(bref))
|
||||
{
|
||||
CHK_DCB(dcb);
|
||||
#if defined(SS_DEBUG)
|
||||
/**
|
||||
* session must be moved to SESSION_STATE_STOPPING state before
|
||||
* router session is closed.
|
||||
*/
|
||||
if (dcb->session != NULL)
|
||||
{
|
||||
ss_dassert(dcb->session->state == SESSION_STATE_STOPPING);
|
||||
}
|
||||
#endif
|
||||
|
||||
/** Clean operation counter in bref and in SERVER */
|
||||
while (BREF_IS_WAITING_RESULT(bref))
|
||||
{
|
||||
@ -1109,8 +1063,7 @@ static void closeSession(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_sessio
|
||||
|
||||
gwbuf_free(router_cli_ses->queue);
|
||||
|
||||
/** Unlock */
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
SCHEMAROUTER *inst = router_cli_ses->router;
|
||||
|
||||
spinlock_acquire(&inst->lock);
|
||||
if (inst->stats.longest_sescmd < router_cli_ses->stats.longest_sescmd)
|
||||
@ -1633,103 +1586,68 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session,
|
||||
|
||||
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
|
||||
|
||||
if (!rses_begin_locked_router_action(router_cli_ses))
|
||||
if (router_cli_ses->closed)
|
||||
{
|
||||
MXS_INFO("Route query aborted! Routing session is closed <");
|
||||
ret = 0;
|
||||
goto retblock;
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!(rses_is_closed = router_cli_ses->rses_closed))
|
||||
if (router_cli_ses->init & INIT_UNINT)
|
||||
{
|
||||
if (router_cli_ses->init & INIT_UNINT)
|
||||
{
|
||||
/* Generate database list */
|
||||
gen_databaselist(inst, router_cli_ses);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* If the databases are still being mapped or if the client connected
|
||||
* with a default database but no database mapping was performed we need
|
||||
* to store the query. Once the databases have been mapped and/or the
|
||||
* default database is taken into use we can send the query forward.
|
||||
*/
|
||||
if (router_cli_ses->init & (INIT_MAPPING | INIT_USE_DB))
|
||||
{
|
||||
int init_rval = 1;
|
||||
char* querystr = modutil_get_SQL(querybuf);
|
||||
MXS_INFO("Storing query for session %p: %s",
|
||||
router_cli_ses->rses_client_dcb->session,
|
||||
querystr);
|
||||
MXS_FREE(querystr);
|
||||
querybuf = gwbuf_make_contiguous(querybuf);
|
||||
GWBUF* ptr = router_cli_ses->queue;
|
||||
|
||||
while (ptr && ptr->next)
|
||||
{
|
||||
ptr = ptr->next;
|
||||
}
|
||||
|
||||
if (ptr == NULL)
|
||||
{
|
||||
router_cli_ses->queue = querybuf;
|
||||
}
|
||||
else
|
||||
{
|
||||
ptr->next = querybuf;
|
||||
|
||||
}
|
||||
|
||||
if (router_cli_ses->init == (INIT_READY | INIT_USE_DB))
|
||||
{
|
||||
/**
|
||||
* This state is possible if a client connects with a default database
|
||||
* and the shard map was found from the router cache
|
||||
*/
|
||||
if (!handle_default_db(router_cli_ses))
|
||||
{
|
||||
init_rval = 0;
|
||||
}
|
||||
}
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
return init_rval;
|
||||
}
|
||||
/* Generate database list */
|
||||
gen_databaselist(inst, router_cli_ses);
|
||||
|
||||
}
|
||||
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
/**
|
||||
* If the databases are still being mapped or if the client connected
|
||||
* with a default database but no database mapping was performed we need
|
||||
* to store the query. Once the databases have been mapped and/or the
|
||||
* default database is taken into use we can send the query forward.
|
||||
*/
|
||||
if (router_cli_ses->init & (INIT_MAPPING | INIT_USE_DB))
|
||||
{
|
||||
int init_rval = 1;
|
||||
char* querystr = modutil_get_SQL(querybuf);
|
||||
MXS_INFO("Storing query for session %p: %s",
|
||||
router_cli_ses->rses_client_dcb->session,
|
||||
querystr);
|
||||
MXS_FREE(querystr);
|
||||
querybuf = gwbuf_make_contiguous(querybuf);
|
||||
GWBUF* ptr = router_cli_ses->queue;
|
||||
|
||||
while (ptr && ptr->next)
|
||||
{
|
||||
ptr = ptr->next;
|
||||
}
|
||||
|
||||
if (ptr == NULL)
|
||||
{
|
||||
router_cli_ses->queue = querybuf;
|
||||
}
|
||||
else
|
||||
{
|
||||
ptr->next = querybuf;
|
||||
|
||||
}
|
||||
|
||||
if (router_cli_ses->init == (INIT_READY | INIT_USE_DB))
|
||||
{
|
||||
/**
|
||||
* This state is possible if a client connects with a default database
|
||||
* and the shard map was found from the router cache
|
||||
*/
|
||||
if (!handle_default_db(router_cli_ses))
|
||||
{
|
||||
init_rval = 0;
|
||||
}
|
||||
}
|
||||
|
||||
return init_rval;
|
||||
}
|
||||
|
||||
packet = GWBUF_DATA(querybuf);
|
||||
packet_type = packet[4];
|
||||
|
||||
if (rses_is_closed)
|
||||
{
|
||||
/**
|
||||
* MYSQL_COM_QUIT may have sent by client and as a part of backend
|
||||
* closing procedure.
|
||||
*/
|
||||
if (packet_type != MYSQL_COM_QUIT)
|
||||
{
|
||||
char* query_str = modutil_get_query(querybuf);
|
||||
|
||||
MXS_ERROR("Can't route %s:%s:\"%s\" to "
|
||||
"backend server. Router is closed.",
|
||||
STRPACKETTYPE(packet_type),
|
||||
STRQTYPE(qtype),
|
||||
(query_str == NULL ? "(empty)" : query_str));
|
||||
MXS_FREE(query_str);
|
||||
}
|
||||
ret = 0;
|
||||
goto retblock;
|
||||
}
|
||||
|
||||
/** If buffer is not contiguous, make it such */
|
||||
if (querybuf->next != NULL)
|
||||
{
|
||||
querybuf = gwbuf_make_contiguous(querybuf);
|
||||
}
|
||||
|
||||
if (detect_show_shards(querybuf))
|
||||
{
|
||||
process_show_shards(router_cli_ses);
|
||||
@ -1783,28 +1701,20 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session,
|
||||
case MYSQL_COM_DAEMON: /**< 1d ? */
|
||||
default:
|
||||
break;
|
||||
} /**< switch by packet type */
|
||||
|
||||
}
|
||||
|
||||
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
|
||||
{
|
||||
uint8_t* packet = GWBUF_DATA(querybuf);
|
||||
unsigned char ptype = packet[4];
|
||||
size_t len = MXS_MIN(GWBUF_LENGTH(querybuf),
|
||||
MYSQL_GET_PAYLOAD_LEN((unsigned char *)querybuf->start) - 1);
|
||||
char* data = (char*)&packet[5];
|
||||
char* contentstr = strndup(data, len);
|
||||
char *sql;
|
||||
int sql_len;
|
||||
char* qtypestr = qc_typemask_to_string(qtype);
|
||||
modutil_extract_SQL(querybuf, &sql, &sql_len);
|
||||
|
||||
MXS_INFO("> Cmd: %s, type: %s, "
|
||||
"stmt: %s%s %s",
|
||||
STRPACKETTYPE(ptype),
|
||||
(qtypestr == NULL ? "N/A" : qtypestr),
|
||||
contentstr,
|
||||
MXS_INFO("> Command: %s, stmt: %.*s %s%s",
|
||||
STRPACKETTYPE(packet_type), sql_len, sql,
|
||||
(querybuf->hint == NULL ? "" : ", Hint:"),
|
||||
(querybuf->hint == NULL ? "" : STRHINTTYPE(querybuf->hint->type)));
|
||||
|
||||
MXS_FREE(contentstr);
|
||||
MXS_FREE(qtypestr);
|
||||
}
|
||||
/**
|
||||
@ -1830,8 +1740,6 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session,
|
||||
router_cli_ses->shardmap->state = SHMAP_STALE;
|
||||
spinlock_release(&router_cli_ses->shardmap->lock);
|
||||
|
||||
rses_begin_locked_router_action(router_cli_ses);
|
||||
|
||||
router_cli_ses->rses_config.last_refresh = now;
|
||||
router_cli_ses->queue = querybuf;
|
||||
int rc_refresh = 1;
|
||||
@ -1844,7 +1752,6 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session,
|
||||
{
|
||||
rc_refresh = 0;
|
||||
}
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
return rc_refresh;
|
||||
}
|
||||
extract_database(querybuf, db);
|
||||
@ -2002,14 +1909,6 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session,
|
||||
goto retblock;
|
||||
}
|
||||
|
||||
/** Lock router session */
|
||||
if (!rses_begin_locked_router_action(router_cli_ses))
|
||||
{
|
||||
MXS_INFO("Route query aborted! Routing session is closed <");
|
||||
ret = 0;
|
||||
goto retblock;
|
||||
}
|
||||
|
||||
if (TARGET_IS_ANY(route_target))
|
||||
{
|
||||
for (int i = 0; i < router_cli_ses->rses_nbackends; i++)
|
||||
@ -2027,7 +1926,6 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session,
|
||||
{
|
||||
/**No valid backends alive*/
|
||||
MXS_ERROR("Failed to route query, no backends are available.");
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
ret = 0;
|
||||
goto retblock;
|
||||
}
|
||||
@ -2074,10 +1972,9 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session,
|
||||
if (sescmd_cursor_is_active(scur))
|
||||
{
|
||||
ss_dassert((bref->bref_pending_cmd == NULL ||
|
||||
router_cli_ses->rses_closed));
|
||||
router_cli_ses->closed));
|
||||
bref->bref_pending_cmd = gwbuf_clone(querybuf);
|
||||
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
ret = 1;
|
||||
goto retblock;
|
||||
}
|
||||
@ -2100,7 +1997,7 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session,
|
||||
MXS_ERROR("Routing query failed.");
|
||||
}
|
||||
}
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
|
||||
retblock:
|
||||
MXS_FREE(targetserver);
|
||||
gwbuf_free(querybuf);
|
||||
@ -2108,64 +2005,6 @@ retblock:
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* @node Acquires lock to router client session if it is not closed.
|
||||
*
|
||||
* Parameters:
|
||||
* @param rses - in, use
|
||||
*
|
||||
*
|
||||
* @return true if router session was not closed. If return value is true
|
||||
* it means that router is locked, and must be unlocked later. False, if
|
||||
* router was closed before lock was acquired.
|
||||
*
|
||||
*
|
||||
* @details (write detailed description here)
|
||||
*
|
||||
*/
|
||||
static bool rses_begin_locked_router_action(
|
||||
SCHEMAROUTER_SESSION* rses)
|
||||
{
|
||||
bool succp = false;
|
||||
|
||||
CHK_CLIENT_RSES(rses);
|
||||
|
||||
if (rses->rses_closed)
|
||||
{
|
||||
goto return_succp;
|
||||
}
|
||||
spinlock_acquire(&rses->rses_lock);
|
||||
if (rses->rses_closed)
|
||||
{
|
||||
spinlock_release(&rses->rses_lock);
|
||||
goto return_succp;
|
||||
}
|
||||
succp = true;
|
||||
|
||||
return_succp:
|
||||
return succp;
|
||||
}
|
||||
|
||||
/** to be inline'd */
|
||||
/**
|
||||
* @node Releases router client session lock.
|
||||
*
|
||||
* Parameters:
|
||||
* @param rses - <usage>
|
||||
* <description>
|
||||
*
|
||||
* @return void
|
||||
*
|
||||
*
|
||||
* @details (write detailed description here)
|
||||
*
|
||||
*/
|
||||
static void rses_end_locked_router_action(SCHEMAROUTER_SESSION* rses)
|
||||
{
|
||||
CHK_CLIENT_RSES(rses);
|
||||
spinlock_release(&rses->rses_lock);
|
||||
}
|
||||
|
||||
/**
|
||||
* Diagnostics routine
|
||||
*
|
||||
@ -2240,47 +2079,31 @@ static void clientReply(MXS_ROUTER* instance,
|
||||
GWBUF* buffer,
|
||||
DCB* backend_dcb)
|
||||
{
|
||||
DCB* client_dcb;
|
||||
SCHEMAROUTER_SESSION* router_cli_ses;
|
||||
sescmd_cursor_t* scur = NULL;
|
||||
backend_ref_t* bref;
|
||||
GWBUF* writebuf = buffer;
|
||||
|
||||
router_cli_ses = (SCHEMAROUTER_SESSION *) router_session;
|
||||
SCHEMAROUTER_SESSION *router_cli_ses = (SCHEMAROUTER_SESSION *) router_session;
|
||||
CHK_CLIENT_RSES(router_cli_ses);
|
||||
|
||||
/**
|
||||
* Lock router client session for secure read of router session members.
|
||||
* Note that this could be done without lock by using version #
|
||||
*/
|
||||
if (!rses_begin_locked_router_action(router_cli_ses))
|
||||
if (router_cli_ses->closed)
|
||||
{
|
||||
while ((writebuf = gwbuf_consume(writebuf, gwbuf_length(writebuf))));
|
||||
gwbuf_free(buffer);
|
||||
return;
|
||||
}
|
||||
|
||||
/** Holding lock ensures that router session remains open */
|
||||
ss_dassert(backend_dcb->session != NULL);
|
||||
client_dcb = backend_dcb->session->client_dcb;
|
||||
|
||||
/** Unlock */
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
|
||||
if (client_dcb == NULL || !rses_begin_locked_router_action(router_cli_ses))
|
||||
{
|
||||
while ((writebuf = gwbuf_consume(writebuf, gwbuf_length(writebuf))));
|
||||
return;
|
||||
}
|
||||
DCB *client_dcb = backend_dcb->session->client_dcb;
|
||||
|
||||
bref = get_bref_from_dcb(router_cli_ses, backend_dcb);
|
||||
|
||||
if (bref == NULL)
|
||||
{
|
||||
/** Unlock router session */
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
while ((writebuf = gwbuf_consume(writebuf, gwbuf_length(writebuf))))
|
||||
{
|
||||
;
|
||||
}
|
||||
gwbuf_free(writebuf);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -2297,11 +2120,8 @@ static void clientReply(MXS_ROUTER* instance,
|
||||
if (router_cli_ses->init & INIT_MAPPING)
|
||||
{
|
||||
int rc = inspect_backend_mapping_states(router_cli_ses, bref, &writebuf);
|
||||
|
||||
while (writebuf && (writebuf = gwbuf_consume(writebuf, gwbuf_length(writebuf))))
|
||||
{
|
||||
;
|
||||
}
|
||||
gwbuf_free(writebuf);
|
||||
writebuf = NULL;
|
||||
|
||||
if (rc == 1)
|
||||
{
|
||||
@ -2311,15 +2131,8 @@ static void clientReply(MXS_ROUTER* instance,
|
||||
router_cli_ses->shardmap->last_updated = time(NULL);
|
||||
spinlock_release(&router_cli_ses->shardmap->lock);
|
||||
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
|
||||
synchronize_shard_map(router_cli_ses);
|
||||
|
||||
if (!rses_begin_locked_router_action(router_cli_ses))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Check if the session is reconnecting with a database name
|
||||
* that is not in the hashtable. If the database is not found
|
||||
@ -2330,7 +2143,6 @@ static void clientReply(MXS_ROUTER* instance,
|
||||
if (router_cli_ses->init & INIT_USE_DB)
|
||||
{
|
||||
bool success = handle_default_db(router_cli_ses);
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
if (!success)
|
||||
{
|
||||
dcb_close(router_cli_ses->rses_client_dcb);
|
||||
@ -2347,8 +2159,6 @@ static void clientReply(MXS_ROUTER* instance,
|
||||
router_cli_ses);
|
||||
}
|
||||
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
|
||||
if (rc == -1)
|
||||
{
|
||||
dcb_close(router_cli_ses->rses_client_dcb);
|
||||
@ -2370,14 +2180,7 @@ static void clientReply(MXS_ROUTER* instance,
|
||||
route_queued_query(router_cli_ses);
|
||||
}
|
||||
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
if (writebuf)
|
||||
{
|
||||
while ((writebuf = gwbuf_consume(writebuf, gwbuf_length(writebuf))))
|
||||
{
|
||||
;
|
||||
}
|
||||
}
|
||||
gwbuf_free(writebuf);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -2385,12 +2188,11 @@ static void clientReply(MXS_ROUTER* instance,
|
||||
{
|
||||
ss_dassert(router_cli_ses->init == INIT_READY);
|
||||
route_queued_query(router_cli_ses);
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
return;
|
||||
}
|
||||
|
||||
CHK_BACKEND_REF(bref);
|
||||
scur = &bref->bref_sescmd_cur;
|
||||
sescmd_cursor_t *scur = &bref->bref_sescmd_cur;
|
||||
/**
|
||||
* Active cursor means that reply is from session command
|
||||
* execution.
|
||||
@ -2466,15 +2268,7 @@ static void clientReply(MXS_ROUTER* instance,
|
||||
router_cli_ses->rses_client_dcb->session);
|
||||
MXS_SESSION_ROUTE_REPLY(backend_dcb->session, writebuf);
|
||||
}
|
||||
/** Unlock router session */
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
|
||||
/** Lock router session */
|
||||
if (!rses_begin_locked_router_action(router_cli_ses))
|
||||
{
|
||||
/** Log to debug that router was closed */
|
||||
return;
|
||||
}
|
||||
/** There is one pending session command to be executed. */
|
||||
if (sescmd_cursor_is_active(scur))
|
||||
{
|
||||
@ -2520,8 +2314,6 @@ static void clientReply(MXS_ROUTER* instance,
|
||||
gwbuf_free(bref->bref_pending_cmd);
|
||||
bref->bref_pending_cmd = NULL;
|
||||
}
|
||||
/** Unlock router session */
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
|
||||
return;
|
||||
}
|
||||
@ -2893,7 +2685,6 @@ static void rses_property_add(SCHEMAROUTER_SESSION* rses,
|
||||
|
||||
CHK_CLIENT_RSES(rses);
|
||||
CHK_RSES_PROP(prop);
|
||||
ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock));
|
||||
|
||||
prop->rses_prop_rsession = rses;
|
||||
p = rses->rses_properties[prop->rses_prop_type];
|
||||
@ -2918,13 +2709,8 @@ static void rses_property_add(SCHEMAROUTER_SESSION* rses,
|
||||
*/
|
||||
static mysql_sescmd_t* rses_property_get_sescmd(rses_property_t* prop)
|
||||
{
|
||||
mysql_sescmd_t* sescmd;
|
||||
|
||||
CHK_RSES_PROP(prop);
|
||||
ss_dassert(prop->rses_prop_rsession == NULL ||
|
||||
SPINLOCK_IS_LOCKED(&prop->rses_prop_rsession->rses_lock));
|
||||
|
||||
sescmd = &prop->rses_prop_data.sescmd;
|
||||
mysql_sescmd_t *sescmd = &prop->rses_prop_data.sescmd;
|
||||
|
||||
if (sescmd != NULL)
|
||||
{
|
||||
@ -2992,7 +2778,6 @@ static GWBUF* sescmd_cursor_process_replies(GWBUF* replybuf,
|
||||
sescmd_cursor_t* scur;
|
||||
|
||||
scur = &bref->bref_sescmd_cur;
|
||||
ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock)));
|
||||
scmd = sescmd_cursor_get_command(scur);
|
||||
|
||||
CHK_GWBUF(replybuf);
|
||||
@ -3056,7 +2841,6 @@ static mysql_sescmd_t* sescmd_cursor_get_command(sescmd_cursor_t* scur)
|
||||
{
|
||||
mysql_sescmd_t* scmd;
|
||||
|
||||
ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock)));
|
||||
scur->scmd_cur_cmd = rses_property_get_sescmd(*scur->scmd_cur_ptr_property);
|
||||
|
||||
CHK_MYSQL_SESCMD(scur->scmd_cur_cmd);
|
||||
@ -3070,7 +2854,6 @@ static mysql_sescmd_t* sescmd_cursor_get_command(sescmd_cursor_t* scur)
|
||||
static bool sescmd_cursor_is_active(sescmd_cursor_t* sescmd_cursor)
|
||||
{
|
||||
bool succp;
|
||||
ss_dassert(SPINLOCK_IS_LOCKED(&sescmd_cursor->scmd_cur_rses->rses_lock));
|
||||
|
||||
succp = sescmd_cursor->scmd_cur_active;
|
||||
return succp;
|
||||
@ -3080,7 +2863,6 @@ static bool sescmd_cursor_is_active(sescmd_cursor_t* sescmd_cursor)
|
||||
static void sescmd_cursor_set_active(sescmd_cursor_t* sescmd_cursor,
|
||||
bool value)
|
||||
{
|
||||
ss_dassert(SPINLOCK_IS_LOCKED(&sescmd_cursor->scmd_cur_rses->rses_lock));
|
||||
/** avoid calling unnecessarily */
|
||||
ss_dassert(sescmd_cursor->scmd_cur_active != value);
|
||||
sescmd_cursor->scmd_cur_active = value;
|
||||
@ -3252,7 +3034,6 @@ static bool sescmd_cursor_next(sescmd_cursor_t* scur)
|
||||
|
||||
ss_dassert(scur != NULL);
|
||||
ss_dassert(*(scur->scmd_cur_ptr_property) != NULL);
|
||||
ss_dassert(SPINLOCK_IS_LOCKED(&(*(scur->scmd_cur_ptr_property))->rses_prop_rsession->rses_lock));
|
||||
|
||||
/** Illegal situation */
|
||||
if (scur == NULL ||
|
||||
@ -3333,10 +3114,10 @@ static uint64_t getCapabilities(MXS_ROUTER* instance)
|
||||
* if execute_sescmd_in_backend failed.
|
||||
*/
|
||||
static bool route_session_write(SCHEMAROUTER_SESSION* router_cli_ses,
|
||||
GWBUF* querybuf,
|
||||
SCHEMAROUTER* inst,
|
||||
unsigned char packet_type,
|
||||
qc_query_type_t qtype)
|
||||
GWBUF* querybuf,
|
||||
SCHEMAROUTER* inst,
|
||||
unsigned char packet_type,
|
||||
qc_query_type_t qtype)
|
||||
{
|
||||
bool succp = false;
|
||||
rses_property_t* prop;
|
||||
@ -3362,10 +3143,9 @@ static bool route_session_write(SCHEMAROUTER_SESSION* router_cli_ses,
|
||||
succp = true;
|
||||
|
||||
/** Lock router session */
|
||||
if (!rses_begin_locked_router_action(router_cli_ses))
|
||||
if (router_cli_ses->closed)
|
||||
{
|
||||
succp = false;
|
||||
goto return_succp;
|
||||
return false;
|
||||
}
|
||||
|
||||
for (i = 0; i < router_cli_ses->rses_nbackends; i++)
|
||||
@ -3391,21 +3171,15 @@ static bool route_session_write(SCHEMAROUTER_SESSION* router_cli_ses,
|
||||
}
|
||||
}
|
||||
}
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
|
||||
gwbuf_free(querybuf);
|
||||
goto return_succp;
|
||||
return succp;
|
||||
}
|
||||
/** Lock router session */
|
||||
if (!rses_begin_locked_router_action(router_cli_ses))
|
||||
{
|
||||
succp = false;
|
||||
goto return_succp;
|
||||
}
|
||||
|
||||
if (router_cli_ses->rses_nbackends <= 0)
|
||||
{
|
||||
succp = false;
|
||||
goto return_succp;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (router_cli_ses->rses_config.max_sescmd_hist > 0 &&
|
||||
@ -3416,10 +3190,9 @@ static bool route_session_write(SCHEMAROUTER_SESSION* router_cli_ses,
|
||||
router_cli_ses->rses_config.max_sescmd_hist);
|
||||
gwbuf_free(querybuf);
|
||||
atomic_add(&router_cli_ses->router->stats.n_hist_exceeded, 1);
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
router_cli_ses->rses_client_dcb->func.hangup(router_cli_ses->rses_client_dcb);
|
||||
poll_fake_hangup_event(router_cli_ses->rses_client_dcb);
|
||||
|
||||
goto return_succp;
|
||||
return succp;
|
||||
}
|
||||
|
||||
if (router_cli_ses->rses_config.disable_sescmd_hist)
|
||||
@ -3528,10 +3301,7 @@ static bool route_session_write(SCHEMAROUTER_SESSION* router_cli_ses,
|
||||
succp = false;
|
||||
}
|
||||
}
|
||||
/** Unlock router session */
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
|
||||
return_succp:
|
||||
return succp;
|
||||
}
|
||||
|
||||
@ -3558,59 +3328,31 @@ static void handleError(MXS_ROUTER* instance,
|
||||
bool* succp)
|
||||
{
|
||||
ss_dassert(problem_dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER);
|
||||
MXS_SESSION* session;
|
||||
CHK_DCB(problem_dcb);
|
||||
SCHEMAROUTER* inst = (SCHEMAROUTER *)instance;
|
||||
SCHEMAROUTER_SESSION* rses = (SCHEMAROUTER_SESSION *)router_session;
|
||||
MXS_SESSION *session = problem_dcb->session;
|
||||
ss_dassert(session && rses);
|
||||
|
||||
CHK_DCB(problem_dcb);
|
||||
CHK_SESSION(session);
|
||||
CHK_CLIENT_RSES(rses);
|
||||
|
||||
session = problem_dcb->session;
|
||||
|
||||
if (session == NULL || rses == NULL)
|
||||
switch (action)
|
||||
{
|
||||
case ERRACT_NEW_CONNECTION:
|
||||
*succp = handle_error_new_connection(inst, rses, problem_dcb, errmsgbuf);
|
||||
break;
|
||||
|
||||
case ERRACT_REPLY_CLIENT:
|
||||
handle_error_reply_client(session, rses, problem_dcb, errmsgbuf);
|
||||
*succp = false; /*< no new backend servers were made available */
|
||||
break;
|
||||
|
||||
default:
|
||||
*succp = false;
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
CHK_SESSION(session);
|
||||
CHK_CLIENT_RSES(rses);
|
||||
|
||||
switch (action)
|
||||
{
|
||||
case ERRACT_NEW_CONNECTION:
|
||||
{
|
||||
if (!rses_begin_locked_router_action(rses))
|
||||
{
|
||||
*succp = false;
|
||||
break;
|
||||
}
|
||||
/**
|
||||
* This is called in hope of getting replacement for
|
||||
* failed slave(s).
|
||||
*/
|
||||
*succp = handle_error_new_connection(inst,
|
||||
rses,
|
||||
problem_dcb,
|
||||
errmsgbuf);
|
||||
rses_end_locked_router_action(rses);
|
||||
break;
|
||||
}
|
||||
|
||||
case ERRACT_REPLY_CLIENT:
|
||||
{
|
||||
handle_error_reply_client(session,
|
||||
rses,
|
||||
problem_dcb,
|
||||
errmsgbuf);
|
||||
*succp = false; /*< no new backend servers were made available */
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
*succp = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
dcb_close(problem_dcb);
|
||||
}
|
||||
|
||||
@ -3652,9 +3394,7 @@ static void handle_error_reply_client(MXS_SESSION* ses,
|
||||
*/
|
||||
bool have_servers(SCHEMAROUTER_SESSION* rses)
|
||||
{
|
||||
int i;
|
||||
|
||||
for (i = 0; i < rses->rses_nbackends; i++)
|
||||
for (int i = 0; i < rses->rses_nbackends; i++)
|
||||
{
|
||||
if (BREF_IS_IN_USE(&rses->rses_backend_ref[i]) &&
|
||||
!BREF_IS_CLOSED(&rses->rses_backend_ref[i]))
|
||||
@ -3684,15 +3424,10 @@ static bool handle_error_new_connection(SCHEMAROUTER* inst,
|
||||
DCB* backend_dcb,
|
||||
GWBUF* errmsg)
|
||||
{
|
||||
MXS_SESSION* ses;
|
||||
unsigned char cmd = *((unsigned char*)errmsg->start + 4);
|
||||
|
||||
backend_ref_t* bref;
|
||||
bool succp;
|
||||
|
||||
ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock));
|
||||
|
||||
ses = backend_dcb->session;
|
||||
MXS_SESSION *ses = backend_dcb->session;
|
||||
CHK_SESSION(ses);
|
||||
|
||||
/**
|
||||
@ -3738,8 +3473,7 @@ static bool handle_error_new_connection(SCHEMAROUTER* inst,
|
||||
*/
|
||||
succp = connect_backend_servers(rses->rses_backend_ref,
|
||||
rses->rses_nbackends,
|
||||
ses,
|
||||
inst);
|
||||
ses, inst);
|
||||
|
||||
if (!have_servers(rses))
|
||||
{
|
||||
@ -3760,42 +3494,28 @@ return_succp:
|
||||
*
|
||||
* @return backend reference pointer if succeed or NULL
|
||||
*/
|
||||
static backend_ref_t* get_bref_from_dcb(SCHEMAROUTER_SESSION* rses,
|
||||
DCB* dcb)
|
||||
static backend_ref_t* get_bref_from_dcb(SCHEMAROUTER_SESSION *rses,
|
||||
DCB *dcb)
|
||||
{
|
||||
backend_ref_t* bref;
|
||||
int i = 0;
|
||||
CHK_DCB(dcb);
|
||||
CHK_CLIENT_RSES(rses);
|
||||
|
||||
bref = rses->rses_backend_ref;
|
||||
|
||||
while (i < rses->rses_nbackends)
|
||||
for (int i = 0; i < rses->rses_nbackends; i++)
|
||||
{
|
||||
if (bref->bref_dcb == dcb)
|
||||
if (rses->rses_backend_ref[i].bref_dcb == dcb)
|
||||
{
|
||||
break;
|
||||
return &rses->rses_backend_ref[i];
|
||||
}
|
||||
bref++;
|
||||
i += 1;
|
||||
}
|
||||
|
||||
if (i == rses->rses_nbackends)
|
||||
{
|
||||
bref = NULL;
|
||||
}
|
||||
return bref;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static sescmd_cursor_t* backend_ref_get_sescmd_cursor(backend_ref_t* bref)
|
||||
{
|
||||
sescmd_cursor_t* scur;
|
||||
CHK_BACKEND_REF(bref);
|
||||
|
||||
scur = &bref->bref_sescmd_cur;
|
||||
CHK_SESCMD_CUR(scur);
|
||||
|
||||
return scur;
|
||||
CHK_SESCMD_CUR((&bref->bref_sescmd_cur));
|
||||
return &bref->bref_sescmd_cur;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -318,9 +318,7 @@ struct schemarouter_session
|
||||
#if defined(SS_DEBUG)
|
||||
skygw_chk_t rses_chk_top;
|
||||
#endif
|
||||
SPINLOCK rses_lock; /*< protects rses_deleted */
|
||||
int rses_versno; /*< even = no active update, else odd. not used 4/14 */
|
||||
bool rses_closed; /*< true when closeSession is called */
|
||||
bool closed; /*< true when closeSession is called */
|
||||
DCB* rses_client_dcb;
|
||||
MYSQL_session* rses_mysql_session; /*< Session client data (username, password, SHA1). */
|
||||
/** Properties listed by their type */
|
||||
@ -333,8 +331,8 @@ struct schemarouter_session
|
||||
bool rses_transaction_active; /*< Is a transaction active */
|
||||
struct schemarouter_instance *router; /*< The router instance */
|
||||
struct schemarouter_session* next; /*< List of router sessions */
|
||||
shard_map_t*
|
||||
shardmap; /*< Database hash containing names of the databases mapped to the servers that contain them */
|
||||
shard_map_t *shardmap; /*< Database hash containing names of the databases
|
||||
* mapped to the servers that contain them */
|
||||
char connect_db[MYSQL_DATABASE_MAXLEN + 1]; /*< Database the user was trying to connect to */
|
||||
char current_db[MYSQL_DATABASE_MAXLEN + 1]; /*< Current active database */
|
||||
init_mask_t init; /*< Initialization state bitmask */
|
||||
|
Reference in New Issue
Block a user