Support for mysql session variable commands, for example, SET AUTOCOMMIT=0

Session commands are identified by query clasisfier, and added to the session command property list in router client session object.
Session commands are then executed in existing backend servers but only one of them will reply to client.
This commit is contained in:
VilhoRaatikka
2014-02-26 20:07:09 +02:00
parent b673108276
commit 3e111534a1
6 changed files with 826 additions and 184 deletions

View File

@ -1335,3 +1335,4 @@ int gw_write(
return w;
}

View File

@ -53,6 +53,8 @@ typedef struct spinlock {
#define SPINLOCK_INIT { 0 }
#endif
#define SPINLOCK_IS_LOCKED(l) ((l)->lock != 0 ? true : false)
extern void spinlock_init(SPINLOCK *lock);
extern void spinlock_acquire(SPINLOCK *lock);
extern int spinlock_acquire_nowait(SPINLOCK *lock);

View File

@ -24,7 +24,7 @@
* @verbatim
* Revision History
*
* bazaar..
* See GitHub https://github.com/skysql/MaxScale
*
* @endverbatim
*/
@ -41,25 +41,88 @@ typedef struct backend {
int backend_conn_count; /*< Number of connections to the server */
} BACKEND;
typedef struct rses_property_st rses_property_t;
typedef struct router_client_session ROUTER_CLIENT_SES;
typedef enum rses_property_type_t {
RSES_PROP_TYPE_UNDEFINED=0,
RSES_PROP_TYPE_FIRST,
RSES_PROP_TYPE_SESCMD=RSES_PROP_TYPE_FIRST,
RSES_PROP_TYPE_LAST=RSES_PROP_TYPE_SESCMD,
RSES_PROP_TYPE_COUNT=RSES_PROP_TYPE_LAST+1
} rses_property_type_t;
/**
* Session variable command
*/
typedef struct mysql_sescmd_st {
#if defined(SS_DEBUG)
skygw_chk_t my_sescmd_chk_top;
#endif
ROUTER_CLIENT_SES* my_sescmd_rsession; /*< parent router session */
rses_property_t* my_sescmd_prop; /*< parent property */
GWBUF* my_sescmd_buf; /*< client query reference */
bool my_sescmd_is_replied; /*< is cmd replied to client */
#if defined(SS_DEBUG)
skygw_chk_t my_sescmd_chk_tail;
#endif
} mysql_sescmd_t;
/**
* Property structure
*/
struct rses_property_st {
#if defined(SS_DEBUG)
skygw_chk_t rses_prop_chk_top;
#endif
SPINLOCK rses_prop_lock; /*< protect property content */
int rses_prop_refcount;
rses_property_type_t rses_prop_type;
union rses_prop_data {
mysql_sescmd_t sescmd;
void* placeholder; /*< to be removed due new type */
} rses_prop_data;
rses_property_t* rses_prop_next; /*< next property of same type */
#if defined(SS_DEBUG)
skygw_chk_t rses_prop_chk_tail;
#endif
};
typedef struct sescmd_cursor_st {
rses_property_t** scmd_cur_ptr_property; /*< address of pointer to owner property */
mysql_sescmd_t* scmd_cur_cmd; /*< pointer to current session command */
bool scmd_cur_active; /*< true if command is being executed */
} sescmd_cursor_t;
typedef enum backend_type_t {
BE_UNDEFINED=-1,
BE_MASTER,
BE_SLAVE,
BE_COUNT
} backend_type_t;
/**
* The client session structure used within this router.
*/
typedef struct router_client_session {
struct router_client_session {
#if defined(SS_DEBUG)
skygw_chk_t rses_chk_top;
#endif
SPINLOCK rses_lock; /*< protects rses_deleted */
int rses_versno; /*< even = no active update, else odd */
bool rses_closed; /*< true when closeSession is called */
BACKEND* be_slave; /*< Slave backend used by client session */
BACKEND* be_master; /*< Master backend used by client session */
DCB* slave_dcb; /*< Slave connection */
DCB* master_dcb; /*< Master connection */
/** Properties listed by their type */
rses_property_t* rses_properties[RSES_PROP_TYPE_COUNT];
BACKEND* rses_backend[BE_COUNT];/*< Backends used by client session */
DCB* rses_dcb[BE_COUNT];
/*< cursor is pointer and status variable to current session command */
sescmd_cursor_t rses_cursor[BE_COUNT];
struct router_client_session* next;
#if defined(SS_DEBUG)
skygw_chk_t rses_chk_tail;
#endif
} ROUTER_CLIENT_SES;
};
/**
* The statistics for this router instance
@ -88,5 +151,4 @@ typedef struct router_instance {
struct router_instance* next; /*< Next router on the list */
} ROUTER_INSTANCE;
#endif
#endif /*< _RWSPLITROUTER_H */

View File

@ -119,10 +119,10 @@ static ROUTER_OBJECT MyObject = {
errorReply
};
static bool rses_begin_router_action(
static bool rses_begin_locked_router_action(
ROUTER_CLIENT_SES* rses);
static void rses_exit_router_action(
static void rses_end_locked_router_action(
ROUTER_CLIENT_SES* rses);
static SPINLOCK instlock;
@ -498,13 +498,13 @@ DCB* backend_dcb;
/**
* Lock router client session for secure read and update.
*/
if (rses_begin_router_action(router_cli_ses))
if (rses_begin_locked_router_action(router_cli_ses))
{
backend_dcb = router_cli_ses->backend_dcb;
router_cli_ses->backend_dcb = NULL;
router_cli_ses->rses_closed = true;
/** Unlock */
rses_exit_router_action(router_cli_ses);
rses_end_locked_router_action(router_cli_ses);
/**
* Close the backend server connection
@ -550,14 +550,14 @@ routeQuery(ROUTER *instance, void *router_session, GWBUF *queue)
/**
* Lock router client session for secure read of DCBs
*/
rses_is_closed = !(rses_begin_router_action(router_cli_ses));
rses_is_closed = !(rses_begin_locked_router_action(router_cli_ses));
}
if (!rses_is_closed)
{
backend_dcb = router_cli_ses->backend_dcb;
/** unlock */
rses_exit_router_action(router_cli_ses);
rses_end_locked_router_action(router_cli_ses);
}
if (rses_is_closed || backend_dcb == NULL)
@ -696,7 +696,7 @@ errorReply(
* @details (write detailed description here)
*
*/
static bool rses_begin_router_action(
static bool rses_begin_locked_router_action(
ROUTER_CLIENT_SES* rses)
{
bool succp = false;
@ -731,7 +731,7 @@ return_succp:
* @details (write detailed description here)
*
*/
static void rses_exit_router_action(
static void rses_end_locked_router_action(
ROUTER_CLIENT_SES* rses)
{
CHK_CLIENT_RSES(rses);

View File

@ -82,12 +82,62 @@ static ROUTER_OBJECT MyObject = {
clientReply,
NULL
};
static bool rses_begin_router_action(
static bool rses_begin_locked_router_action(
ROUTER_CLIENT_SES* rses);
static void rses_exit_router_action(
static void rses_end_locked_router_action(
ROUTER_CLIENT_SES* rses);
static void mysql_sescmd_done(
mysql_sescmd_t* sescmd);
static mysql_sescmd_t* mysql_sescmd_init (
rses_property_t* rses_prop,
GWBUF* sescmd_buf,
ROUTER_CLIENT_SES* rses);
static rses_property_t* mysql_sescmd_get_property(
mysql_sescmd_t* scmd);
static rses_property_t* rses_property_init(
rses_property_type_t prop_type);
static void rses_property_add(
ROUTER_CLIENT_SES* rses,
rses_property_t* prop);
static void rses_property_done(
rses_property_t* prop);
static sescmd_cursor_t* rses_get_sescmd_cursor(
ROUTER_CLIENT_SES* rses,
backend_type_t be_type);
static bool execute_sescmd_in_backend(
ROUTER_CLIENT_SES* rses,
backend_type_t be_type);
static bool sescmd_cursor_is_active(
sescmd_cursor_t* sescmd_cursor);
static GWBUF* sescmd_cursor_get_querybuf(
sescmd_cursor_t* scur);
static mysql_sescmd_t* sescmd_cursor_get_command(
sescmd_cursor_t* scur);
static bool sescmd_cursor_next(
sescmd_cursor_t* scur);
static void sescmd_reply_to_client(
DCB* client_dcb,
mysql_sescmd_t* scmd);
static bool cont_exec_sescmd_in_backend(
ROUTER_CLIENT_SES* rses,
backend_type_t be_type);
static SPINLOCK instlock;
static ROUTER_INSTANCE* instances;
@ -267,8 +317,7 @@ static void* newSession(
ROUTER* router_inst,
SESSION* session)
{
BACKEND* be_slave = NULL;
BACKEND* be_master = NULL;
BACKEND* local_backend[BE_COUNT];
ROUTER_CLIENT_SES* client_rses;
ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_inst;
bool succp;
@ -281,16 +330,31 @@ static void* newSession(
ss_dassert(false);
return NULL;
}
memset(local_backend, 0, BE_COUNT*sizeof(void*));
spinlock_init(&client_rses->rses_lock);
#if defined(SS_DEBUG)
client_rses->rses_chk_top = CHK_NUM_ROUTER_SES;
client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES;
#endif
/** store pointers to sescmd list to both cursors */
client_rses->rses_cursor[BE_MASTER].scmd_cur_active = false;
client_rses->rses_cursor[BE_MASTER].scmd_cur_cmd = NULL;
client_rses->rses_cursor[BE_MASTER].scmd_cur_ptr_property =
&client_rses->rses_properties[RSES_PROP_TYPE_SESCMD];
client_rses->rses_cursor[BE_SLAVE].scmd_cur_active = false;
client_rses->rses_cursor[BE_SLAVE].scmd_cur_cmd = NULL;
client_rses->rses_cursor[BE_SLAVE].scmd_cur_ptr_property =
&client_rses->rses_properties[RSES_PROP_TYPE_SESCMD];
/**
* Find a backend server to connect to. This is the extent of the
* load balancing algorithm we need to implement for this simple
* connection router.
*/
succp = search_backend_servers(&be_master, &be_slave, router);
succp = search_backend_servers(&local_backend[BE_MASTER],
&local_backend[BE_SLAVE],
router);
/** Both Master and Slave must be found */
if (!succp) {
@ -300,11 +364,12 @@ static void* newSession(
/**
* Open the slave connection.
*/
client_rses->slave_dcb = dcb_connect(be_slave->backend_server,
client_rses->rses_dcb[BE_SLAVE] = dcb_connect(
local_backend[BE_SLAVE]->backend_server,
session,
be_slave->backend_server->protocol);
local_backend[BE_SLAVE]->backend_server->protocol);
if (client_rses->slave_dcb == NULL) {
if (client_rses->rses_dcb[BE_SLAVE] == NULL) {
ss_dassert(session->refcount == 1);
free(client_rses);
return NULL;
@ -312,13 +377,14 @@ static void* newSession(
/**
* Open the master connection.
*/
client_rses->master_dcb = dcb_connect(be_master->backend_server,
client_rses->rses_dcb[BE_MASTER] = dcb_connect(
local_backend[BE_MASTER]->backend_server,
session,
be_master->backend_server->protocol);
if (client_rses->master_dcb == NULL)
local_backend[BE_MASTER]->backend_server->protocol);
if (client_rses->rses_dcb[BE_MASTER] == NULL)
{
/** Close slave connection first. */
client_rses->slave_dcb->func.close(client_rses->slave_dcb);
client_rses->rses_dcb[BE_SLAVE]->func.close(client_rses->rses_dcb[BE_SLAVE]);
free(client_rses);
return NULL;
}
@ -326,11 +392,11 @@ static void* newSession(
* We now have a master and a slave server with the least connections.
* Bump the connection counts for these servers.
*/
atomic_add(&be_slave->backend_conn_count, 1);
atomic_add(&be_master->backend_conn_count, 1);
atomic_add(&local_backend[BE_SLAVE]->backend_conn_count, 1);
atomic_add(&local_backend[BE_MASTER]->backend_conn_count, 1);
client_rses->be_slave = be_slave;
client_rses->be_master = be_master;
client_rses->rses_backend[BE_SLAVE] = local_backend[BE_SLAVE];
client_rses->rses_backend[BE_MASTER] = local_backend[BE_MASTER];
router->stats.n_sessions += 1;
/**
@ -371,16 +437,16 @@ static void closeSession(
/**
* Lock router client session for secure read and update.
*/
if (rses_begin_router_action(router_cli_ses))
if (rses_begin_locked_router_action(router_cli_ses))
{
slave_dcb = router_cli_ses->slave_dcb;
router_cli_ses->slave_dcb = NULL;
master_dcb = router_cli_ses->master_dcb;
router_cli_ses->master_dcb = NULL;
slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE];
router_cli_ses->rses_dcb[BE_SLAVE] = NULL;
master_dcb = router_cli_ses->rses_dcb[BE_MASTER];
router_cli_ses->rses_dcb[BE_MASTER] = NULL;
router_cli_ses->rses_closed = true;
/** Unlock */
rses_exit_router_action(router_cli_ses);
rses_end_locked_router_action(router_cli_ses);
/**
* Close the backend server connections
@ -403,14 +469,15 @@ static void freeSession(
{
ROUTER_CLIENT_SES* router_cli_ses;
ROUTER_INSTANCE* router;
int i;
router_cli_ses = (ROUTER_CLIENT_SES *)router_client_session;
router = (ROUTER_INSTANCE *)router_instance;
atomic_add(&router_cli_ses->be_slave->backend_conn_count, -1);
atomic_add(&router_cli_ses->be_master->backend_conn_count, -1);
atomic_add(&router_cli_ses->be_slave->backend_server->stats.n_current, -1);
atomic_add(&router_cli_ses->be_master->backend_server->stats.n_current, -1);
atomic_add(&router_cli_ses->rses_backend[BE_SLAVE]->backend_conn_count, -1);
atomic_add(&router_cli_ses->rses_backend[BE_MASTER]->backend_conn_count, -1);
atomic_add(&router_cli_ses->rses_backend[BE_SLAVE]->backend_server->stats.n_current, -1);
atomic_add(&router_cli_ses->rses_backend[BE_MASTER]->backend_server->stats.n_current, -1);
spinlock_acquire(&router->lock);
@ -429,6 +496,22 @@ static void freeSession(
}
spinlock_release(&router->lock);
/**
* For each property type, walk through the list, finalize properties
* and free the allocated memory.
*/
for (i=RSES_PROP_TYPE_FIRST; i<RSES_PROP_TYPE_COUNT; i++)
{
rses_property_t* p = router_cli_ses->rses_properties[i];
rses_property_t* q = p;
while (p != NULL)
{
q = p->rses_prop_next;
rses_property_done(p);
p = q;
}
}
/*
* We are no longer in the linked list, free
* all the memory and other resources associated
@ -473,6 +556,7 @@ static int routeQuery(
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
bool rses_is_closed;
rses_property_t* prop;
CHK_CLIENT_RSES(router_cli_ses);
@ -529,15 +613,15 @@ static int routeQuery(
/**
* Lock router client session for secure read of DCBs
*/
rses_is_closed = !(rses_begin_router_action(router_cli_ses));
rses_is_closed = !(rses_begin_locked_router_action(router_cli_ses));
}
if (!rses_is_closed)
{
master_dcb = router_cli_ses->master_dcb;
slave_dcb = router_cli_ses->slave_dcb;
master_dcb = router_cli_ses->rses_dcb[BE_MASTER];
slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE];
/** unlock */
rses_exit_router_action(router_cli_ses);
rses_end_locked_router_action(router_cli_ses);
}
if (rses_is_closed || (master_dcb == NULL && slave_dcb == NULL))
@ -566,11 +650,10 @@ static int routeQuery(
case QUERY_TYPE_WRITE:
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"%lu [routeQuery:rwsplit] Query type\t%s, routing to "
"Master.",
"%lu [routeQuery:rwsplit] Query type\t%s, "
"routing to Master.",
pthread_self(),
STRQTYPE(qtype))));
ret = master_dcb->func.write(master_dcb, querybuf);
atomic_add(&inst->stats.n_master, 1);
@ -584,14 +667,41 @@ static int routeQuery(
"routing to Slave.",
pthread_self(),
STRQTYPE(qtype))));
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
/** Log error to debug */
goto return_ret;
}
/** If session command is being executed, route to master */
if (sescmd_cursor_is_active(rses_get_sescmd_cursor(
router_cli_ses,
BE_MASTER)))
{
ret = master_dcb->func.write(master_dcb, querybuf);
atomic_add(&inst->stats.n_master, 1);
}
else{
ret = slave_dcb->func.write(slave_dcb, querybuf);
atomic_add(&inst->stats.n_slave, 1);
}
rses_end_locked_router_action(router_cli_ses);
goto return_ret;
break;
case QUERY_TYPE_SESSION_WRITE:
/**
* Execute in backends used by current router session.
* Save session variable commands to router session property
* struct. Thus, they
* can be replayed in backends which are started and joined later.
*
* Suppress OK packets sent to MaxScale by slaves.
*
* DOES THIS ALL APPLY TO COM_QUIT AS WELL??
*
*/
/**
* Update connections which are used in this session.
*
@ -653,12 +763,56 @@ static int routeQuery(
break;
case COM_QUERY:
ret = master_dcb->func.session(master_dcb, (void *)querybuf);
slave_dcb->func.session(slave_dcb, (void *)bufcopy);
/**
* 1. Create new property of type RSES_PROP_TYPE_SESCMD.
* 2. Add property to the ROUTER_CLIENT_SES struct of
* this router session.
* 3. For each backend, and for each non-executed
* sescmd:
* call execution of current sescmd in
* all backends as long as both have executed
* them all.
* Execution call is dcb->func.session.
* All sescmds are executed when its return value is
* NULL, otherwise it is a pointer to next property.
*/
prop = rses_property_init(RSES_PROP_TYPE_SESCMD);
/**
* Additional reference is created to querybuf to
* prevent it from being released before properties
* are cleaned up as a part of router sessionclean-up.
*/
mysql_sescmd_init(prop,
gwbuf_clone(querybuf),
router_cli_ses);
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
rses_property_done(prop);
goto return_ret;
}
/** Add sescmd property to router client session */
rses_property_add(router_cli_ses, prop);
/** Execute session command in master */
if (!execute_sescmd_in_backend(router_cli_ses, BE_MASTER))
{
/** Log error */
}
/** Execute session command in slave */
if (!execute_sescmd_in_backend(router_cli_ses, BE_SLAVE))
{
/** Log error */
}
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
break;
default:
ret = master_dcb->func.session(master_dcb, (void *)querybuf);
ret = master_dcb->func.session(master_dcb,
(void *)querybuf);
slave_dcb->func.session(slave_dcb, (void *)bufcopy);
break;
} /**< switch by packet type */
@ -707,7 +861,7 @@ return_ret:
* @details (write detailed description here)
*
*/
static bool rses_begin_router_action(
static bool rses_begin_locked_router_action(
ROUTER_CLIENT_SES* rses)
{
bool succp = false;
@ -725,6 +879,10 @@ static bool rses_begin_router_action(
succp = true;
return_succp:
if (!succp)
{
/** log that router session was closed */
}
return succp;
}
@ -742,7 +900,7 @@ return_succp:
* @details (write detailed description here)
*
*/
static void rses_exit_router_action(
static void rses_end_locked_router_action(
ROUTER_CLIENT_SES* rses)
{
CHK_CLIENT_RSES(rses);
@ -811,8 +969,11 @@ static void clientReply(
DCB* backend_dcb)
{
DCB* master_dcb;
DCB* slave_dcb;
DCB* client_dcb;
ROUTER_CLIENT_SES* router_cli_ses;
sescmd_cursor_t* scur = NULL;
backend_type_t be_type = BE_UNDEFINED;
router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
CHK_CLIENT_RSES(router_cli_ses);
@ -821,34 +982,80 @@ static void clientReply(
* Lock router client session for secure read of router session members.
* Note that this could be done without lock by using version #
*/
if (rses_begin_router_action(router_cli_ses))
if (!rses_begin_locked_router_action(router_cli_ses))
{
master_dcb = router_cli_ses->master_dcb;
goto lock_failed;
}
master_dcb = router_cli_ses->rses_dcb[BE_MASTER];
slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE];
/** Unlock */
rses_exit_router_action(router_cli_ses);
rses_end_locked_router_action(router_cli_ses);
client_dcb = backend_dcb->session->client;
if (backend_dcb != NULL &&
backend_dcb->command == ROUTER_CHANGE_SESSION)
/**
* 1. Check if backend received reply to sescmd.
* 2. Check sescmd's state whether OK_PACKET has been
* sent to client already and if not, lock property cursor,
* reply to client, and move property cursor forward. Finally
* release the lock.
* 3. If reply for this sescmd is sent, lock property cursor
* and
*/
if (client_dcb == NULL)
{
/* if backend_dcb is master we can reply to client */
if (client_dcb != NULL &&
backend_dcb == master_dcb)
{
client_dcb->func.write(client_dcb, writebuf);
} else {
/* consume the gwbuf without writing to client */
gwbuf_consume(writebuf, gwbuf_length(writebuf));
/** Log that client was closed before reply */
return;
}
}
else if (client_dcb != NULL)
if (backend_dcb == master_dcb)
{
/* normal flow */
client_dcb->func.write(client_dcb, writebuf);
be_type = BE_MASTER;
}
else if (backend_dcb == slave_dcb)
{
be_type = BE_SLAVE;
}
scur = rses_get_sescmd_cursor(router_cli_ses, be_type);
ss_dassert(writebuf == sescmd_cursor_get_querybuf(scur));
/**
* Active cursor means that reply is from session command
* execution.
*/
if (sescmd_cursor_is_active(scur))
{
mysql_sescmd_t* scmd = sescmd_cursor_get_command(scur);
sescmd_reply_to_client(client_dcb, scmd);
/**
* If there is a pending sescmd, start its execution.
*/
if (!rses_begin_locked_router_action(router_cli_ses))
{
goto lock_failed;
}
/** Start execution of all pending ses commands. */
while (sescmd_cursor_next(scur))
{
if (!cont_exec_sescmd_in_backend(router_cli_ses, be_type))
{
/** Log error */
}
else
{
/** Log execution of pending sescmd */
}
}
rses_end_locked_router_action(router_cli_ses);
}
return; /*< succeed */
lock_failed:
/** log that router session couldn't be locked */
return;
}
/**
@ -878,8 +1085,7 @@ static bool search_backend_servers(
BACKEND** p_slave,
ROUTER_INSTANCE* router)
{
BACKEND* be_master = NULL;
BACKEND* be_slave = NULL;
BACKEND* local_backend[BE_COUNT] = {NULL,NULL};
int i;
bool succp = true;
@ -927,24 +1133,24 @@ static bool search_backend_servers(
* If no candidate set, set first running
* server as an initial candidate server.
*/
if (be_slave == NULL)
if (local_backend[BE_SLAVE] == NULL)
{
be_slave = be;
local_backend[BE_SLAVE] = be;
}
else if (be->backend_conn_count <
be_slave->backend_conn_count)
local_backend[BE_SLAVE]->backend_conn_count)
{
/**
* This running server has fewer
* connections, set it as a new
* candidate.
*/
be_slave = be;
local_backend[BE_SLAVE] = be;
}
else if (be->backend_conn_count ==
be_slave->backend_conn_count &&
local_backend[BE_SLAVE]->backend_conn_count &&
be->backend_server->stats.n_connections <
be_slave->backend_server->stats.n_connections)
local_backend[BE_SLAVE]->backend_server->stats.n_connections)
{
/**
* This running server has the same
@ -954,19 +1160,19 @@ static bool search_backend_servers(
* than candidate, set this server
* to candidate.
*/
be_slave = be;
local_backend[BE_SLAVE] = be;
}
}
else if (p_master != NULL &&
be_master == NULL &&
local_backend[BE_MASTER] == NULL &&
SERVER_IS_MASTER(be->backend_server))
{
be_master = be;
local_backend[BE_MASTER] = be;
}
}
}
if (p_slave != NULL && be_slave == NULL) {
if (p_slave != NULL && local_backend[BE_SLAVE] == NULL) {
succp = false;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
@ -975,7 +1181,7 @@ static bool search_backend_servers(
i)));
}
if (p_master != NULL && be_master == NULL) {
if (p_master != NULL && local_backend[BE_MASTER] == NULL) {
succp = false;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
@ -984,28 +1190,385 @@ static bool search_backend_servers(
i)));
}
if (be_slave != NULL) {
*p_slave = be_slave;
if (local_backend[BE_SLAVE] != NULL) {
*p_slave = local_backend[BE_SLAVE];
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"%lu [readwritesplit:search_backend_servers] Selected "
"Slave %s:%d from %d candidates.",
pthread_self(),
be_slave->backend_server->name,
be_slave->backend_server->port,
local_backend[BE_SLAVE]->backend_server->name,
local_backend[BE_SLAVE]->backend_server->port,
i)));
}
if (be_master != NULL) {
*p_master = be_master;
if (local_backend[BE_MASTER] != NULL) {
*p_master = local_backend[BE_MASTER];
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"%lu [readwritesplit:search_backend_servers] Selected "
"Master %s:%d "
"from %d candidates.",
pthread_self(),
be_master->backend_server->name,
be_master->backend_server->port,
local_backend[BE_MASTER]->backend_server->name,
local_backend[BE_MASTER]->backend_server->port,
i)));
}
return succp;
}
/**
* Create a generic router session property strcture.
*/
static rses_property_t* rses_property_init(
rses_property_type_t prop_type)
{
rses_property_t* prop;
prop = (rses_property_t*)calloc(1, sizeof(rses_property_t));
if (prop == NULL)
{
goto return_prop;
}
spinlock_init(&prop->rses_prop_lock);
prop->rses_prop_type = prop_type;
#if defined(SS_DEBUG)
prop->rses_prop_chk_top = CHK_NUM_ROUTER_PROPERTY;
prop->rses_prop_chk_tail = CHK_NUM_ROUTER_PROPERTY;
#endif
return_prop:
CHK_RSES_PROP(prop);
return prop;
}
/**
* Create session command property.
*/
static mysql_sescmd_t* mysql_sescmd_init (
rses_property_t* rses_prop,
GWBUF* sescmd_buf,
ROUTER_CLIENT_SES* rses)
{
mysql_sescmd_t* sescmd;
CHK_RSES_PROP(rses_prop);
sescmd = &rses_prop->rses_prop_data.sescmd;
sescmd->my_sescmd_prop = rses_prop; /*< reference to owning property */
#if defined(SS_DEBUG)
sescmd->my_sescmd_chk_top = CHK_NUM_MY_SESCMD;
sescmd->my_sescmd_chk_tail = CHK_NUM_MY_SESCMD;
#endif
sescmd->my_sescmd_buf = sescmd_buf; /*< session command query */
ss_dassert(sescmd_buf->sbuf->refcount > 0);
return sescmd;
}
/**
* Property is freed at the end of router client session.
*/
static void rses_property_done(
rses_property_t* prop)
{
CHK_RSES_PROP(prop);
switch (prop->rses_prop_type) {
case RSES_PROP_TYPE_SESCMD:
mysql_sescmd_done(&prop->rses_prop_data.sescmd);
break;
default:
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"%lu [rses_property_done] Unknown property type %d "
"in property %p",
pthread_self(),
prop->rses_prop_type,
prop)));
ss_dassert(false);
break;
}
free(prop);
}
static void mysql_sescmd_done(
mysql_sescmd_t* sescmd)
{
CHK_RSES_PROP(sescmd->my_sescmd_prop);
gwbuf_free(sescmd->my_sescmd_buf);
memset(sescmd, 0, sizeof(mysql_sescmd_t));
}
/**
* Add property to the router_client_ses structure's rses_properties
* array. The slot is determined by the type of property.
* In each slot there is a list of properties of similar type.
*
* Router client session must be locked.
*/
static void rses_property_add(
ROUTER_CLIENT_SES* rses,
rses_property_t* prop)
{
rses_property_t* p;
CHK_CLIENT_RSES(rses);
CHK_RSES_PROP(prop);
ss_dassert(rses->rses_lock.lock != 0);
p = rses->rses_properties[prop->rses_prop_type];
if (p == NULL)
{
rses->rses_properties[prop->rses_prop_type] = prop;
}
else
{
while (p->rses_prop_next != NULL)
{
p = p->rses_prop_next;
}
p->rses_prop_next = prop;
}
}
static void rses_begin_locked_property_action(
rses_property_t* prop)
{
CHK_RSES_PROP(prop);
spinlock_acquire(&prop->rses_prop_lock);
}
static void rses_end_locked_property_action(
rses_property_t* prop)
{
CHK_RSES_PROP(prop);
spinlock_release(&prop->rses_prop_lock);
}
/** router must be locked */
static void sescmd_cursor_set_active(
sescmd_cursor_t* sescmd_cursor,
bool value)
{
ss_dassert(SPINLOCK_IS_LOCKED(&(*sescmd_cursor->scmd_cur_ptr_property)->rses_prop_lock));
/** avoid calling unnecessarily */
ss_dassert(sescmd_cursor->scmd_cur_active != value);
sescmd_cursor->scmd_cur_active = value;
}
static void sescmd_reply_to_client(
DCB* client_dcb,
mysql_sescmd_t* scmd)
{
rses_property_t* prop;
CHK_DCB(client_dcb);
CHK_MYSQL_SESCMD(scmd);
CHK_GWBUF(scmd->my_sescmd_buf);
prop = mysql_sescmd_get_property(scmd);
rses_begin_locked_property_action(prop);
if (!scmd->my_sescmd_is_replied)
{
CHK_GWBUF(scmd->my_sescmd_buf);
client_dcb->func.write(client_dcb, scmd->my_sescmd_buf);
scmd->my_sescmd_is_replied = true;
}
rses_end_locked_property_action(prop);
}
static mysql_sescmd_t* sescmd_cursor_get_command(
sescmd_cursor_t* scur)
{
mysql_sescmd_t* scmd = scur->scmd_cur_cmd;
CHK_MYSQL_SESCMD(scmd);
return scmd;
}
/** router must be locked */
static sescmd_cursor_t* rses_get_sescmd_cursor(
ROUTER_CLIENT_SES* rses,
backend_type_t be_type)
{
CHK_CLIENT_RSES(rses);
return &rses->rses_cursor[be_type];
}
/** router must be locked */
static bool sescmd_cursor_is_active(
sescmd_cursor_t* sescmd_cursor)
{
bool succp = sescmd_cursor->scmd_cur_active;
return succp;
}
/** Router session must be locked */
static GWBUF* sescmd_cursor_get_querybuf(
sescmd_cursor_t* scur)
{
GWBUF* buf;
ss_dassert(scur->scmd_cur_cmd != NULL);
buf = scur->scmd_cur_cmd->my_sescmd_buf;
CHK_GWBUF(buf);
return buf;
}
/**
* If session command cursor is passive, sends the command to backend for
* execution.
*
* Returns true if command was sent or added successfully to the queue.
* Returns false if command sending failed or if there are no pending session
* commands.
*
* Router session must be locked.
*/
static bool execute_sescmd_in_backend(
ROUTER_CLIENT_SES* rses,
backend_type_t be_type)
{
DCB* dcb;
bool succp = true;
int rc = 0;
sescmd_cursor_t* scur;
dcb = rses->rses_dcb[be_type];
CHK_DCB(dcb);
CHK_CLIENT_RSES(rses);
ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock));
scur = rses_get_sescmd_cursor(rses, be_type);
/** Return if there are no pending ses commands */
if (scur->scmd_cur_cmd == NULL)
{
succp = false;
}
if (!sescmd_cursor_is_active(scur))
{
sescmd_cursor_set_active(scur, true);
rc = dcb->func.session(dcb, sescmd_cursor_get_querybuf(scur));
if (rc != 0)
{
succp = false;
}
}
return succp;
}
/**
* Execute session commands when cursor is already active.
*
* Router session must be locked
*
* Return true if there was pending sescmd and sending command to
* backend server succeed. Otherwise false.
*/
static bool cont_exec_sescmd_in_backend(
ROUTER_CLIENT_SES* rses,
backend_type_t be_type)
{
bool succp = true;
DCB* dcb;
sescmd_cursor_t* scur;
int rc;
CHK_CLIENT_RSES(rses);
dcb = rses->rses_dcb[be_type];
CHK_DCB(dcb);
ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock));
scur = rses_get_sescmd_cursor(rses, be_type);
ss_dassert(sescmd_cursor_is_active(scur));
if (scur->scmd_cur_cmd == NULL)
{
succp = false;
}
rc = dcb->func.session(dcb, sescmd_cursor_get_querybuf(scur));
if (rc != 0)
{
succp = false;
}
return succp;
}
/**
* Moves cursor to next property and copied address of its sescmd to cursor.
* Current propery must be non-null.
*
* Router session must be locked
*/
static bool sescmd_cursor_next(
sescmd_cursor_t* scur)
{
bool succp = false;
rses_property_t* prop_curr;
rses_property_t* prop_next;
ss_dassert(SPINLOCK_IS_LOCKED(&(*scur->scmd_cur_ptr_property)->rses_prop_lock));
if (scur == NULL ||
*(scur->scmd_cur_ptr_property) == NULL ||
scur->scmd_cur_cmd == NULL)
{
/** Log error to debug */
goto return_succp;
}
#if defined(SS_DEBUG)
prop_curr = *(scur->scmd_cur_ptr_property);
prop_next = prop_curr->rses_prop_next;
#endif
CHK_RSES_PROP(prop_curr);
CHK_MYSQL_SESCMD(scur->scmd_cur_cmd);
CHK_RSES_PROP(scur->scmd_cur_cmd->my_sescmd_prop);
ss_dassert(scur->scmd_cur_cmd->my_sescmd_prop == prop_curr);
/** If there is a next property move forward */
if ((*scur->scmd_cur_ptr_property)->rses_prop_next != NULL)
{
scur->scmd_cur_ptr_property =
&((*scur->scmd_cur_ptr_property)->rses_prop_next);
scur->scmd_cur_cmd =
&(*scur->scmd_cur_ptr_property)->rses_prop_data.sescmd;
}
else
{
/** No more properties, can't proceed. */
goto return_succp;
}
CHK_RSES_PROP((*(scur->scmd_cur_ptr_property)));
CHK_MYSQL_SESCMD(scur->scmd_cur_cmd);
ss_dassert(prop_next == *(scur->scmd_cur_ptr_property));
if (scur->scmd_cur_cmd != NULL)
{
succp = true;
}
return_succp:
return succp;
}
static rses_property_t* mysql_sescmd_get_property(
mysql_sescmd_t* scmd)
{
CHK_MYSQL_SESCMD(scmd);
return scmd->my_sescmd_prop;
}

View File

@ -117,7 +117,9 @@ typedef enum skygw_chk_t {
CHK_NUM_DCB,
CHK_NUM_PROTOCOL,
CHK_NUM_SESSION,
CHK_NUM_ROUTER_SES
CHK_NUM_ROUTER_SES,
CHK_NUM_MY_SESCMD,
CHK_NUM_ROUTER_PROPERTY
} skygw_chk_t;
# define STRBOOL(b) ((b) ? "true" : "false")
@ -428,6 +430,18 @@ typedef enum skygw_chk_t {
"Router client session has invalid check fields"); \
}
#define CHK_RSES_PROP(p) { \
ss_info_dassert((p)->rses_prop_chk_top == CHK_NUM_ROUTER_PROPERTY && \
(p)->rses_prop_chk_tail == CHK_NUM_ROUTER_PROPERTY, \
"Router property has invalid check fields"); \
}
#define CHK_MYSQL_SESCMD(s) { \
ss_info_dassert((s)->my_sescmd_chk_top == CHK_NUM_MY_SESCMD && \
(s)->my_sescmd_chk_tail == CHK_NUM_MY_SESCMD, \
"Session command has invalid check fields"); \
}
#if defined(SS_DEBUG)
bool conn_open[10240];